You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/07 00:30:04 UTC
svn commit: r1358457 - in /incubator/flume/branches/branch-1.2.0:
flume-ng-core/src/main/java/org/apache/flume/sink/
flume-ng-core/src/test/java/org/apache/flume/sink/ flume-ng-doc/sphinx/
flume-ng-sdk/src/main/java/org/apache/flume/api/ flume-ng-sdk/s...
Author: mpercy
Date: Fri Jul 6 22:30:04 2012
New Revision: 1358457
URL: http://svn.apache.org/viewvc?rev=1358457&view=rev
Log:
FLUME-1316. AvroSink should be configurable for connect-timeout and request-timeout.
(Mike Percy via Hari Shreedharan)
Modified:
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
incubator/flume/branches/branch-1.2.0/flume-ng-doc/sphinx/FlumeUserGuide.rst
incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Fri Jul 6 22:30:04 2012
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import java.util.Properties;
+import org.apache.flume.api.RpcClientConfigurationConstants;
/**
* <p>
@@ -74,27 +76,39 @@ import com.google.common.collect.Lists;
* <tr>
* <th>Parameter</th>
* <th>Description</th>
- * <th>Unit / Type</th>
+ * <th>Unit (data type)</th>
* <th>Default</th>
* </tr>
* <tr>
* <td><tt>hostname</tt></td>
* <td>The hostname to which events should be sent.</td>
- * <td>Hostname or IP / String</td>
+ * <td>Hostname or IP (String)</td>
* <td>none (required)</td>
* </tr>
* <tr>
* <td><tt>port</tt></td>
* <td>The port to which events should be sent on <tt>hostname</tt>.</td>
- * <td>TCP port / int</td>
+ * <td>TCP port (int)</td>
* <td>none (required)</td>
* </tr>
* <tr>
* <td><tt>batch-size</tt></td>
* <td>The maximum number of events to send per RPC.</td>
- * <td>events / int</td>
+ * <td>events (int)</td>
* <td>100</td>
* </tr>
+ * <tr>
+ * <td><tt>connect-timeout</tt></td>
+ * <td>Maximum time to wait for the first Avro handshake and RPC request</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
+ * <tr>
+ * <td><tt>request-timeout</tt></td>
+ * <td>Maximum time to wait RPC requests after the first</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
* </table>
* <p>
* <b>Metrics</b>
@@ -106,14 +120,13 @@ import com.google.common.collect.Lists;
public class AvroSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
- private static final Integer defaultBatchSize = 100;
private String hostname;
private Integer port;
- private Integer batchSize;
private RpcClient client;
private CounterGroup counterGroup;
+ private Properties clientProps;
public AvroSink() {
counterGroup = new CounterGroup();
@@ -121,16 +134,37 @@ public class AvroSink extends AbstractSi
@Override
public void configure(Context context) {
+ clientProps = new Properties();
+
hostname = context.getString("hostname");
port = context.getInteger("port");
- batchSize = context.getInteger("batch-size");
- if (batchSize == null) {
- batchSize = defaultBatchSize;
- }
-
Preconditions.checkState(hostname != null, "No hostname specified");
Preconditions.checkState(port != null, "No port specified");
+
+ clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+ clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
+ "h1", hostname + ":" + port);
+
+ Integer batchSize = context.getInteger("batch-size");
+ if (batchSize != null) {
+ clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+ String.valueOf(batchSize));
+ }
+
+ Long connectTimeout = context.getLong("connect-timeout");
+ if (connectTimeout != null) {
+ clientProps.setProperty(
+ RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
+ String.valueOf(connectTimeout));
+ }
+
+ Long requestTimeout = context.getLong("request-timeout");
+ if (requestTimeout != null) {
+ clientProps.setProperty(
+ RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+ String.valueOf(requestTimeout));
+ }
}
/**
@@ -141,11 +175,12 @@ public class AvroSink extends AbstractSi
private void createConnection() throws FlumeException {
if (client == null) {
- logger.debug("Avro sink {}: Building RpcClient with hostname: {}, " +
- "port: {}, batchSize: {}",
- new Object[] { getName(), hostname, port, batchSize });
+ logger.info("Avro sink {}: Building RpcClient with hostname: {}, " +
+ "port: {}",
+ new Object[] { getName(), hostname, port });
- client = RpcClientFactory.getDefaultInstance(hostname, port, batchSize);
+ client = RpcClientFactory.getInstance(clientProps);
+ logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client);
}
}
@@ -195,9 +230,8 @@ public class AvroSink extends AbstractSi
try {
createConnection();
} catch (FlumeException e) {
- logger.warn("Unable to create avro client using hostname:" + hostname
- + ", port:" + port + ", batchSize: " + batchSize +
- ". Exception follows.", e);
+ logger.warn("Unable to create avro client using hostname: " + hostname
+ + ", port: " + port, e);
/* Try to prevent leaking resources. */
destroyConnection();
@@ -238,7 +272,7 @@ public class AvroSink extends AbstractSi
List<Event> batch = Lists.newLinkedList();
- for (int i = 0; i < batchSize; i++) {
+ for (int i = 0; i < client.getBatchSize(); i++) {
Event event = channel.take();
if (event == null) {
@@ -259,16 +293,19 @@ public class AvroSink extends AbstractSi
transaction.commit();
counterGroup.incrementAndGet("batch.success");
- } catch (ChannelException e) {
- transaction.rollback();
- logger.error("Avro Sink " + getName() + ": Unable to get event from" +
- " channel. Exception follows.", e);
- status = Status.BACKOFF;
-
- } catch (Exception ex) {
+ } catch (Throwable t) {
transaction.rollback();
- destroyConnection();
- throw new EventDeliveryException("Failed to send message", ex);
+ counterGroup.incrementAndGet("batch.failure");
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof ChannelException) {
+ logger.error("Avro Sink " + getName() + ": Unable to get event from" +
+ " channel " + channel.getName() + ". Exception follows.", t);
+ status = Status.BACKOFF;
+ } else {
+ destroyConnection();
+ throw new EventDeliveryException("Failed to send events", t);
+ }
} finally {
transaction.close();
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Fri Jul 6 22:30:04 2012
@@ -23,7 +23,9 @@ import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Charsets;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
@@ -52,7 +54,7 @@ public class TestAvroSink {
private static final Logger logger = LoggerFactory
.getLogger(TestAvroSink.class);
- private static final String hostname = "localhost";
+ private static final String hostname = "127.0.0.1";
private static final Integer port = 41414;
private AvroSink sink;
@@ -65,9 +67,11 @@ public class TestAvroSink {
Context context = new Context();
- context.put("hostname", "localhost");
- context.put("port", "41414");
- context.put("batch-size", "2");
+ context.put("hostname", hostname);
+ context.put("port", String.valueOf(port));
+ context.put("batch-size", String.valueOf(2));
+ context.put("connect-timeout", String.valueOf(2000L));
+ context.put("request-timeout", String.valueOf(3000L));
sink.setChannel(channel);
@@ -76,8 +80,9 @@ public class TestAvroSink {
}
@Test
- public void testLifecycle() throws InterruptedException {
- Server server = createServer();
+ public void testLifecycle() throws InterruptedException,
+ InstantiationException, IllegalAccessException {
+ Server server = createServer(new MockAvroServer());
server.start();
@@ -94,11 +99,10 @@ public class TestAvroSink {
@Test
public void testProcess() throws InterruptedException,
- EventDeliveryException {
+ EventDeliveryException, InstantiationException, IllegalAccessException {
- Event event = EventBuilder.withBody("test event 1".getBytes(),
- new HashMap<String, String>());
- Server server = createServer();
+ Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+ Server server = createServer(new MockAvroServer());
server.start();
@@ -130,12 +134,67 @@ public class TestAvroSink {
}
@Test
+ public void testTimeout() throws InterruptedException,
+ EventDeliveryException, InstantiationException, IllegalAccessException {
+ Event event = EventBuilder.withBody("foo", Charsets.UTF_8);
+ AtomicLong delay = new AtomicLong();
+ Server server = createServer(new DelayMockAvroServer(delay));
+ server.start();
+ sink.start();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int i = 0; i < 4; i++) {
+ channel.put(event);
+ }
+ txn.commit();
+ txn.close();
+
+ // should throw EventDeliveryException due to connect timeout
+ delay.set(3000L); // because connect-timeout = 2000
+ boolean threw = false;
+ try {
+ sink.process();
+ } catch (EventDeliveryException ex) {
+ logger.info("Correctly threw due to connect timeout. Exception follows.",
+ ex);
+ threw = true;
+ }
+
+ Assert.assertTrue("Must throw due to connect timeout", threw);
+
+ // now, allow the connect handshake to occur
+ delay.set(0);
+ sink.process();
+
+ // should throw another EventDeliveryException due to request timeout
+ delay.set(4000L); // because request-timeout = 3000
+ threw = false;
+ try {
+ sink.process();
+ } catch (EventDeliveryException ex) {
+ logger.info("Correctly threw due to request timeout. Exception follows.",
+ ex);
+ threw = true;
+ }
+
+ Assert.assertTrue("Must throw due to request timeout", threw);
+
+ sink.stop();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.STOP_OR_ERROR, 5000));
+ server.close();
+ }
+
+ @Test
public void testFailedConnect() throws InterruptedException,
- EventDeliveryException {
+ EventDeliveryException, InstantiationException, IllegalAccessException {
Event event = EventBuilder.withBody("test event 1",
Charset.forName("UTF8"));
- Server server = createServer();
+ Server server = createServer(new MockAvroServer());
server.start();
sink.start();
@@ -166,7 +225,7 @@ public class TestAvroSink {
threwException);
}
- server = createServer();
+ server = createServer(new MockAvroServer());
server.start();
for (int i = 0; i < 5; i++) {
@@ -182,9 +241,10 @@ public class TestAvroSink {
server.close();
}
- private Server createServer() {
+ private Server createServer(AvroSourceProtocol protocol)
+ throws IllegalAccessException, InstantiationException {
Server server = new NettyServer(new SpecificResponder(
- AvroSourceProtocol.class, new MockAvroServer()), new InetSocketAddress(
+ AvroSourceProtocol.class, protocol), new InetSocketAddress(
hostname, port));
return server;
@@ -201,9 +261,40 @@ public class TestAvroSink {
@Override
public Status appendBatch(List<AvroFlumeEvent> events)
throws AvroRemoteException {
-
logger.debug("Received event batch:{}", events);
+ return Status.OK;
+ }
+
+ }
+
+ private static class DelayMockAvroServer implements AvroSourceProtocol {
+ private final AtomicLong delay;
+
+ public DelayMockAvroServer(AtomicLong delay) {
+ this.delay = delay;
+ }
+
+ private void sleep() throws AvroRemoteException {
+ try {
+ Thread.sleep(delay.get());
+ } catch (InterruptedException e) {
+ throw new AvroRemoteException("Interrupted while sleeping", e);
+ }
+ }
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ logger.debug("Received event:{}; delaying for {}ms", event, delay);
+ sleep();
+ return Status.OK;
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events)
+ throws AvroRemoteException {
+ logger.debug("Received event batch:{}; delaying for {}ms", events, delay);
+ sleep();
return Status.OK;
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-doc/sphinx/FlumeUserGuide.rst Fri Jul 6 22:30:04 2012
@@ -856,6 +856,8 @@ Property Name Default Description
**hostname** -- The hostname or IP address to bind to.
**port** -- The port # to listen on.
batch-size 100 number of event to batch together for send.
+connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
+request-timeout 20000 Amount of time (ms) to allow for requests after the first.
============== ======= ==============================================
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java Fri Jul 6 22:30:04 2012
@@ -80,25 +80,6 @@ implements RpcClient {
/**
* This constructor is intended to be called from {@link RpcClientFactory}.
- * @param address The InetSocketAddress to connect to
- * @param batchSize Maximum number of Events to accept in appendBatch()
- */
- protected NettyAvroRpcClient(InetSocketAddress address, Integer batchSize)
- throws FlumeException{
- if (address == null){
- logger.error("InetSocketAddress is null, cannot create client.");
- throw new NullPointerException("InetSocketAddress is null");
- }
- this.address = address;
- if(batchSize != null && batchSize > 0) {
- this.batchSize = batchSize;
- }
-
- connect();
- }
-
- /**
- * This constructor is intended to be called from {@link RpcClientFactory}.
* A call to this constructor should be followed by call to configure().
*/
protected NettyAvroRpcClient(){
@@ -183,6 +164,10 @@ implements RpcClient {
if (t instanceof Error) {
throw (Error) t;
}
+ if (t instanceof TimeoutException) {
+ throw new EventDeliveryException(this + ": Failed to send event. " +
+ "RPC request timed out after " + requestTimeout + "ms", t);
+ }
throw new EventDeliveryException(this + ": Failed to send event", t);
}
}
@@ -216,7 +201,8 @@ implements RpcClient {
try {
handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
- throw new EventDeliveryException(this + ": Handshake timed out", ex);
+ throw new EventDeliveryException(this + ": Handshake timed out after " +
+ connectTimeout + " ms", ex);
} catch (InterruptedException ex) {
throw new EventDeliveryException(this + ": Interrupted in handshake", ex);
} catch (ExecutionException ex) {
@@ -235,9 +221,7 @@ implements RpcClient {
@Override
public void appendBatch(List<Event> events) throws EventDeliveryException {
try {
- appendBatch(events, requestTimeout,
- TimeUnit.MILLISECONDS);
-
+ appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
// we mark as no longer active without trying to clean up resources
// client is required to call close() to clean up resources
@@ -245,6 +229,10 @@ implements RpcClient {
if (t instanceof Error) {
throw (Error) t;
}
+ if (t instanceof TimeoutException) {
+ throw new EventDeliveryException(this + ": Failed to send event. " +
+ "RPC request timed out after " + requestTimeout + " ms", t);
+ }
throw new EventDeliveryException(this + ": Failed to send batch", t);
}
}
@@ -289,7 +277,8 @@ implements RpcClient {
try {
handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
- throw new EventDeliveryException(this + ": Handshake timed out", ex);
+ throw new EventDeliveryException(this + ": Handshake timed out after " +
+ connectTimeout + "ms", ex);
} catch (InterruptedException ex) {
throw new EventDeliveryException(this + ": Interrupted in handshake",
ex);
@@ -342,7 +331,7 @@ implements RpcClient {
* {@link Condition} variable gets signaled reliably.
* Throws {@code IllegalStateException} when called to transition from CLOSED
* to another state.
- * @param state
+ * @param newState
*/
private void setState(ConnState newState) {
stateLock.lock();
@@ -426,14 +415,20 @@ implements RpcClient {
}
// batch size
- String strbatchSize = properties.getProperty(
+ String strBatchSize = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_BATCH_SIZE);
+ logger.debug("Batch size string = " + strBatchSize);
batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
- if (strbatchSize != null && !strbatchSize.isEmpty()) {
+ if (strBatchSize != null && !strBatchSize.isEmpty()) {
try {
- batchSize = Integer.parseInt(strbatchSize);
+ int parsedBatch = Integer.parseInt(strBatchSize);
+ if (parsedBatch < 1) {
+ logger.warn("Invalid value for batchSize: {}; Using default value.", parsedBatch);
+ } else {
+ batchSize = parsedBatch;
+ }
} catch (NumberFormatException e) {
- logger.warn("Batchsize is not valid for RpcClient: " + strbatchSize +
+ logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize +
". Default value assigned.", e);
}
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java Fri Jul 6 22:30:04 2012
@@ -69,10 +69,10 @@ public final class RpcClientConfiguratio
public final static Integer DEFAULT_BATCH_SIZE = 100;
/**
- * Default connection timeout in milliseconds.
+ * Default connection, handshake, and initial request timeout in milliseconds.
*/
public final static long DEFAULT_CONNECT_TIMEOUT_MILLIS =
- TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+ TimeUnit.MILLISECONDS.convert(20, TimeUnit.SECONDS);
/**
* Default request timeout in milliseconds.
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java Fri Jul 6 22:30:04 2012
@@ -15,9 +15,12 @@
*/
package org.apache.flume.api;
-import java.net.InetSocketAddress;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
import java.util.Properties;
-
import org.apache.flume.FlumeException;
/**
@@ -42,7 +45,6 @@ public class RpcClientFactory {
* @param properties The properties to instantiate the client with.
* @throws FlumeException
*/
-
@SuppressWarnings("unchecked")
public static RpcClient getInstance(Properties properties)
throws FlumeException {
@@ -86,6 +88,22 @@ public class RpcClientFactory {
}
/**
+ * Delegates to {@link #getInstance(Properties props)}, given a File path
+ * to a {@link Properties} file.
+ * @param propertiesFile Valid properties file
+ * @return RpcClient configured according to the given Properties file.
+ * @throws FileNotFoundException If the file cannot be found
+ * @throws IOException If there is an IO error
+ */
+ public static RpcClient getInstance(File propertiesFile)
+ throws FileNotFoundException, IOException {
+ Reader reader = new FileReader(propertiesFile);
+ Properties props = new Properties();
+ props.load(reader);
+ return getInstance(props);
+ }
+
+ /**
* Deprecated. Use
* {@link getDefaultInstance() getDefaultInstance(String, Integer)} instead.
* @throws FlumeException
@@ -128,10 +146,25 @@ public class RpcClientFactory {
*/
public static RpcClient getDefaultInstance(String hostname, Integer port,
Integer batchSize) throws FlumeException {
- NettyAvroRpcClient client = new NettyAvroRpcClient(
- new InetSocketAddress(hostname, port), batchSize);
- return client;
+ if (hostname == null) {
+ throw new NullPointerException("hostname must not be null");
+ }
+ if (port == null) {
+ throw new NullPointerException("port must not be null");
+ }
+ if (batchSize == null) {
+ throw new NullPointerException("batchSize must not be null");
+ }
+
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+ hostname + ":" + port.intValue());
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
+ NettyAvroRpcClient client = new NettyAvroRpcClient();
+ client.configure(props);
+ return client;
}
public static enum ClientType {
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java Fri Jul 6 22:30:04 2012
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import junit.framework.Assert;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
@@ -99,8 +100,12 @@ public class RpcTestUtils {
* Helper method for constructing a Netty RPC client that talks to localhost.
*/
public static NettyAvroRpcClient getStockLocalClient(int port) {
- NettyAvroRpcClient client =
- new NettyAvroRpcClient(new InetSocketAddress("localhost", port), 0);
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+ "127.0.0.1" + ":" + port);
+ NettyAvroRpcClient client = new NettyAvroRpcClient();
+ client.configure(props);
return client;
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java?rev=1358457&r1=1358456&r2=1358457&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java Fri Jul 6 22:30:04 2012
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.junit.Test;
@@ -47,7 +48,7 @@ public class TestNettyAvroRpcClient {
private static final Logger logger = LoggerFactory
.getLogger(TestNettyAvroRpcClient.class);
- private static final String localhost = "localhost";
+ private static final String localhost = "127.0.0.1";
/**
* Simple request
@@ -79,8 +80,12 @@ public class TestNettyAvroRpcClient {
@Test(expected=FlumeException.class)
public void testUnableToConnect() throws FlumeException {
@SuppressWarnings("unused")
- NettyAvroRpcClient client = new NettyAvroRpcClient(
- new InetSocketAddress(localhost, 1), 0);
+ NettyAvroRpcClient client = new NettyAvroRpcClient();
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost",
+ localhost + ":" + 1);
+ client.configure(props);
}
/**
@@ -95,9 +100,14 @@ public class TestNettyAvroRpcClient {
int moreThanBatchSize = batchSize + 1;
NettyAvroRpcClient client = null;
Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost",
+ localhost + ":" + server.getPort());
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "" + batchSize);
try {
- client = new NettyAvroRpcClient(
- new InetSocketAddress(localhost, server.getPort()), batchSize);
+ client = new NettyAvroRpcClient();
+ client.configure(props);
// send one more than the batch size
List<Event> events = new ArrayList<Event>();