You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/05 23:19:03 UTC

svn commit: r1357932 - in /incubator/flume/trunk: flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/test/java/org/apache/flume/sink/ flume-ng-doc/sphinx/ flume-ng-sdk/src/main/java/org/apache/flume/api/ flume-ng-sdk/src/test/java/org...

Author: hshreedharan
Date: Thu Jul  5 21:19:02 2012
New Revision: 1357932

URL: http://svn.apache.org/viewvc?rev=1357932&view=rev
Log:
FLUME-1316. AvroSink should be configurable for connect-timeout and request-timeout.

(Mike Percy via Hari Shreedharan)


Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
    incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Thu Jul  5 21:19:02 2012
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import java.util.Properties;
+import org.apache.flume.api.RpcClientConfigurationConstants;
 
 /**
  * <p>
@@ -74,27 +76,39 @@ import com.google.common.collect.Lists;
  * <tr>
  * <th>Parameter</th>
  * <th>Description</th>
- * <th>Unit / Type</th>
+ * <th>Unit (data type)</th>
  * <th>Default</th>
  * </tr>
  * <tr>
  * <td><tt>hostname</tt></td>
  * <td>The hostname to which events should be sent.</td>
- * <td>Hostname or IP / String</td>
+ * <td>Hostname or IP (String)</td>
  * <td>none (required)</td>
  * </tr>
  * <tr>
  * <td><tt>port</tt></td>
  * <td>The port to which events should be sent on <tt>hostname</tt>.</td>
- * <td>TCP port / int</td>
+ * <td>TCP port (int)</td>
  * <td>none (required)</td>
  * </tr>
  * <tr>
  * <td><tt>batch-size</tt></td>
  * <td>The maximum number of events to send per RPC.</td>
- * <td>events / int</td>
+ * <td>events (int)</td>
  * <td>100</td>
  * </tr>
+ * <tr>
+ * <td><tt>connect-timeout</tt></td>
+ * <td>Maximum time to wait for the first Avro handshake and RPC request</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
+ * <tr>
+ * <td><tt>request-timeout</tt></td>
+ * <td>Maximum time to wait RPC requests after the first</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
  * </table>
  * <p>
  * <b>Metrics</b>
@@ -106,14 +120,13 @@ import com.google.common.collect.Lists;
 public class AvroSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
-  private static final Integer defaultBatchSize = 100;
 
   private String hostname;
   private Integer port;
-  private Integer batchSize;
 
   private RpcClient client;
   private CounterGroup counterGroup;
+  private Properties clientProps;
 
   public AvroSink() {
     counterGroup = new CounterGroup();
@@ -121,16 +134,37 @@ public class AvroSink extends AbstractSi
 
   @Override
   public void configure(Context context) {
+    clientProps = new Properties();
+
     hostname = context.getString("hostname");
     port = context.getInteger("port");
 
-    batchSize = context.getInteger("batch-size");
-    if (batchSize == null) {
-      batchSize = defaultBatchSize;
-    }
-
     Preconditions.checkState(hostname != null, "No hostname specified");
     Preconditions.checkState(port != null, "No port specified");
+
+    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
+        "h1", hostname + ":" + port);
+
+    Integer batchSize = context.getInteger("batch-size");
+    if (batchSize != null) {
+      clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+          String.valueOf(batchSize));
+    }
+
+    Long connectTimeout = context.getLong("connect-timeout");
+    if (connectTimeout != null) {
+      clientProps.setProperty(
+          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
+          String.valueOf(connectTimeout));
+    }
+
+    Long requestTimeout = context.getLong("request-timeout");
+    if (requestTimeout != null) {
+      clientProps.setProperty(
+          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+          String.valueOf(requestTimeout));
+    }
   }
 
   /**
@@ -141,11 +175,12 @@ public class AvroSink extends AbstractSi
   private void createConnection() throws FlumeException {
 
     if (client == null) {
-      logger.debug("Avro sink {}: Building RpcClient with hostname: {}, " +
-          "port: {}, batchSize: {}",
-          new Object[] { getName(), hostname, port, batchSize });
+      logger.info("Avro sink {}: Building RpcClient with hostname: {}, " +
+          "port: {}",
+          new Object[] { getName(), hostname, port });
 
-       client = RpcClientFactory.getDefaultInstance(hostname, port, batchSize);
+       client = RpcClientFactory.getInstance(clientProps);
+       logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client);
     }
 
   }
@@ -195,9 +230,8 @@ public class AvroSink extends AbstractSi
     try {
       createConnection();
     } catch (FlumeException e) {
-      logger.warn("Unable to create avro client using hostname:" + hostname
-          + ", port:" + port + ", batchSize: " + batchSize +
-          ". Exception follows.", e);
+      logger.warn("Unable to create avro client using hostname: " + hostname
+          + ", port: " + port, e);
 
       /* Try to prevent leaking resources. */
       destroyConnection();
@@ -238,7 +272,7 @@ public class AvroSink extends AbstractSi
 
       List<Event> batch = Lists.newLinkedList();
 
-      for (int i = 0; i < batchSize; i++) {
+      for (int i = 0; i < client.getBatchSize(); i++) {
         Event event = channel.take();
 
         if (event == null) {
@@ -259,16 +293,19 @@ public class AvroSink extends AbstractSi
       transaction.commit();
       counterGroup.incrementAndGet("batch.success");
 
-    } catch (ChannelException e) {
-      transaction.rollback();
-      logger.error("Avro Sink " + getName() + ": Unable to get event from" +
-          " channel. Exception follows.", e);
-      status = Status.BACKOFF;
-
-    } catch (Exception ex) {
+    } catch (Throwable t) {
       transaction.rollback();
-      destroyConnection();
-      throw new EventDeliveryException("Failed to send message", ex);
+      counterGroup.incrementAndGet("batch.failure");
+      if (t instanceof Error) {
+        throw (Error) t;
+      } else if (t instanceof ChannelException) {
+        logger.error("Avro Sink " + getName() + ": Unable to get event from" +
+            " channel " + channel.getName() + ". Exception follows.", t);
+        status = Status.BACKOFF;
+      } else {
+        destroyConnection();
+        throw new EventDeliveryException("Failed to send events", t);
+      }
     } finally {
       transaction.close();
     }

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Thu Jul  5 21:19:02 2012
@@ -23,7 +23,9 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Charsets;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Server;
@@ -52,7 +54,7 @@ public class TestAvroSink {
 
   private static final Logger logger = LoggerFactory
       .getLogger(TestAvroSink.class);
-  private static final String hostname = "localhost";
+  private static final String hostname = "127.0.0.1";
   private static final Integer port = 41414;
 
   private AvroSink sink;
@@ -65,9 +67,11 @@ public class TestAvroSink {
 
     Context context = new Context();
 
-    context.put("hostname", "localhost");
-    context.put("port", "41414");
-    context.put("batch-size", "2");
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
 
     sink.setChannel(channel);
 
@@ -76,8 +80,9 @@ public class TestAvroSink {
   }
 
   @Test
-  public void testLifecycle() throws InterruptedException {
-    Server server = createServer();
+  public void testLifecycle() throws InterruptedException,
+      InstantiationException, IllegalAccessException {
+    Server server = createServer(new MockAvroServer());
 
     server.start();
 
@@ -94,11 +99,10 @@ public class TestAvroSink {
 
   @Test
   public void testProcess() throws InterruptedException,
-      EventDeliveryException {
+      EventDeliveryException, InstantiationException, IllegalAccessException {
 
-    Event event = EventBuilder.withBody("test event 1".getBytes(),
-        new HashMap<String, String>());
-    Server server = createServer();
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    Server server = createServer(new MockAvroServer());
 
     server.start();
 
@@ -130,12 +134,67 @@ public class TestAvroSink {
   }
 
   @Test
+  public void testTimeout() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    Event event = EventBuilder.withBody("foo", Charsets.UTF_8);
+    AtomicLong delay = new AtomicLong();
+    Server server = createServer(new DelayMockAvroServer(delay));
+    server.start();
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 4; i++) {
+      channel.put(event);
+    }
+    txn.commit();
+    txn.close();
+
+    // should throw EventDeliveryException due to connect timeout
+    delay.set(3000L); // because connect-timeout = 2000
+    boolean threw = false;
+    try {
+      sink.process();
+    } catch (EventDeliveryException ex) {
+      logger.info("Correctly threw due to connect timeout. Exception follows.",
+          ex);
+      threw = true;
+    }
+
+    Assert.assertTrue("Must throw due to connect timeout", threw);
+
+    // now, allow the connect handshake to occur
+    delay.set(0);
+    sink.process();
+
+    // should throw another EventDeliveryException due to request timeout
+    delay.set(4000L); // because request-timeout = 3000
+    threw = false;
+    try {
+      sink.process();
+    } catch (EventDeliveryException ex) {
+      logger.info("Correctly threw due to request timeout. Exception follows.",
+          ex);
+      threw = true;
+    }
+
+    Assert.assertTrue("Must throw due to request timeout", threw);
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+    server.close();
+  }
+
+  @Test
   public void testFailedConnect() throws InterruptedException,
-      EventDeliveryException {
+      EventDeliveryException, InstantiationException, IllegalAccessException {
 
     Event event = EventBuilder.withBody("test event 1",
         Charset.forName("UTF8"));
-    Server server = createServer();
+    Server server = createServer(new MockAvroServer());
 
     server.start();
     sink.start();
@@ -166,7 +225,7 @@ public class TestAvroSink {
           threwException);
     }
 
-    server = createServer();
+    server = createServer(new MockAvroServer());
     server.start();
 
     for (int i = 0; i < 5; i++) {
@@ -182,9 +241,10 @@ public class TestAvroSink {
     server.close();
   }
 
-  private Server createServer() {
+  private Server createServer(AvroSourceProtocol protocol)
+      throws IllegalAccessException, InstantiationException {
     Server server = new NettyServer(new SpecificResponder(
-        AvroSourceProtocol.class, new MockAvroServer()), new InetSocketAddress(
+        AvroSourceProtocol.class, protocol), new InetSocketAddress(
         hostname, port));
 
     return server;
@@ -201,9 +261,40 @@ public class TestAvroSink {
     @Override
     public Status appendBatch(List<AvroFlumeEvent> events)
         throws AvroRemoteException {
-
       logger.debug("Received event batch:{}", events);
+      return Status.OK;
+    }
+
+  }
+
+  private static class DelayMockAvroServer implements AvroSourceProtocol {
 
+    private final AtomicLong delay;
+
+    public DelayMockAvroServer(AtomicLong delay) {
+      this.delay = delay;
+    }
+
+    private void sleep() throws AvroRemoteException {
+      try {
+        Thread.sleep(delay.get());
+      } catch (InterruptedException e) {
+        throw new AvroRemoteException("Interrupted while sleeping", e);
+      }
+    }
+
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      logger.debug("Received event:{}; delaying for {}ms", event, delay);
+      sleep();
+      return Status.OK;
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events)
+        throws AvroRemoteException {
+      logger.debug("Received event batch:{}; delaying for {}ms", events, delay);
+      sleep();
       return Status.OK;
     }
 

Modified: incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Thu Jul  5 21:19:02 2012
@@ -856,6 +856,8 @@ Property Name   Default  Description
 **hostname**    --       The hostname or IP address to bind to.
 **port**        --       The port # to listen on.
 batch-size      100      number of event to batch together for send.
+connect-timeout 20000    Amount of time (ms) to allow for the first (handshake) request.
+request-timeout 20000    Amount of time (ms) to allow for requests after the first.
 ==============  =======  ==============================================
 
 

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java Thu Jul  5 21:19:02 2012
@@ -80,25 +80,6 @@ implements RpcClient {
 
   /**
    * This constructor is intended to be called from {@link RpcClientFactory}.
-   * @param address The InetSocketAddress to connect to
-   * @param batchSize Maximum number of Events to accept in appendBatch()
-   */
-  protected NettyAvroRpcClient(InetSocketAddress address, Integer batchSize)
-      throws FlumeException{
-    if (address == null){
-      logger.error("InetSocketAddress is null, cannot create client.");
-      throw new NullPointerException("InetSocketAddress is null");
-    }
-    this.address = address;
-    if(batchSize != null && batchSize > 0) {
-      this.batchSize = batchSize;
-    }
-
-    connect();
-  }
-
-  /**
-   * This constructor is intended to be called from {@link RpcClientFactory}.
    * A call to this constructor should be followed by call to configure().
    */
   protected NettyAvroRpcClient(){
@@ -183,6 +164,10 @@ implements RpcClient {
       if (t instanceof Error) {
         throw (Error) t;
       }
+      if (t instanceof TimeoutException) {
+        throw new EventDeliveryException(this + ": Failed to send event. " +
+            "RPC request timed out after " + requestTimeout + "ms", t);
+      }
       throw new EventDeliveryException(this + ": Failed to send event", t);
     }
   }
@@ -216,7 +201,8 @@ implements RpcClient {
     try {
       handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
     } catch (TimeoutException ex) {
-      throw new EventDeliveryException(this + ": Handshake timed out", ex);
+      throw new EventDeliveryException(this + ": Handshake timed out after " +
+          connectTimeout + " ms", ex);
     } catch (InterruptedException ex) {
       throw new EventDeliveryException(this + ": Interrupted in handshake", ex);
     } catch (ExecutionException ex) {
@@ -235,9 +221,7 @@ implements RpcClient {
   @Override
   public void appendBatch(List<Event> events) throws EventDeliveryException {
     try {
-      appendBatch(events, requestTimeout,
-          TimeUnit.MILLISECONDS);
-
+      appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS);
     } catch (Throwable t) {
       // we mark as no longer active without trying to clean up resources
       // client is required to call close() to clean up resources
@@ -245,6 +229,10 @@ implements RpcClient {
       if (t instanceof Error) {
         throw (Error) t;
       }
+      if (t instanceof TimeoutException) {
+        throw new EventDeliveryException(this + ": Failed to send event. " +
+            "RPC request timed out after " + requestTimeout + " ms", t);
+      }
       throw new EventDeliveryException(this + ": Failed to send batch", t);
     }
   }
@@ -289,7 +277,8 @@ implements RpcClient {
       try {
         handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
       } catch (TimeoutException ex) {
-        throw new EventDeliveryException(this + ": Handshake timed out", ex);
+        throw new EventDeliveryException(this + ": Handshake timed out after " +
+            connectTimeout + "ms", ex);
       } catch (InterruptedException ex) {
         throw new EventDeliveryException(this + ": Interrupted in handshake",
             ex);
@@ -342,7 +331,7 @@ implements RpcClient {
    * {@link Condition} variable gets signaled reliably.
    * Throws {@code IllegalStateException} when called to transition from CLOSED
    * to another state.
-   * @param state
+   * @param newState
    */
   private void setState(ConnState newState) {
     stateLock.lock();
@@ -426,14 +415,20 @@ implements RpcClient {
     }
 
     // batch size
-    String strbatchSize = properties.getProperty(
+    String strBatchSize = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_BATCH_SIZE);
+    logger.debug("Batch size string = " + strBatchSize);
     batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
-    if (strbatchSize != null && !strbatchSize.isEmpty()) {
+    if (strBatchSize != null && !strBatchSize.isEmpty()) {
       try {
-        batchSize = Integer.parseInt(strbatchSize);
+        int parsedBatch = Integer.parseInt(strBatchSize);
+        if (parsedBatch < 1) {
+          logger.warn("Invalid value for batchSize: {}; Using default value.", parsedBatch);
+        } else {
+          batchSize = parsedBatch;
+        }
       } catch (NumberFormatException e) {
-        logger.warn("Batchsize is not valid for RpcClient: " + strbatchSize +
+        logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize +
             ". Default value assigned.", e);
       }
     }

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java Thu Jul  5 21:19:02 2012
@@ -69,10 +69,10 @@ public final class RpcClientConfiguratio
   public final static Integer DEFAULT_BATCH_SIZE = 100;
 
   /**
-   * Default connection timeout in milliseconds.
+   * Default connection, handshake, and initial request timeout in milliseconds.
    */
   public final static long DEFAULT_CONNECT_TIMEOUT_MILLIS =
-      TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+      TimeUnit.MILLISECONDS.convert(20, TimeUnit.SECONDS);
 
   /**
    * Default request timeout in milliseconds.

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java Thu Jul  5 21:19:02 2012
@@ -15,9 +15,12 @@
  */
 package org.apache.flume.api;
 
-import java.net.InetSocketAddress;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
 import java.util.Properties;
-
 import org.apache.flume.FlumeException;
 
 /**
@@ -42,7 +45,6 @@ public class RpcClientFactory {
    * @param properties The properties to instantiate the client with.
    * @throws FlumeException
    */
-
   @SuppressWarnings("unchecked")
   public static RpcClient getInstance(Properties properties)
       throws FlumeException {
@@ -86,6 +88,22 @@ public class RpcClientFactory {
   }
 
   /**
+   * Delegates to {@link #getInstance(Properties props)}, given a File path
+   * to a {@link Properties} file.
+   * @param propertiesFile Valid properties file
+   * @return RpcClient configured according to the given Properties file.
+   * @throws FileNotFoundException If the file cannot be found
+   * @throws IOException If there is an IO error
+   */
+  public static RpcClient getInstance(File propertiesFile)
+      throws FileNotFoundException, IOException {
+    Reader reader = new FileReader(propertiesFile);
+    Properties props = new Properties();
+    props.load(reader);
+    return getInstance(props);
+  }
+
+  /**
    * Deprecated. Use
    * {@link getDefaultInstance() getDefaultInstance(String, Integer)} instead.
    * @throws FlumeException
@@ -128,10 +146,25 @@ public class RpcClientFactory {
    */
   public static RpcClient getDefaultInstance(String hostname, Integer port,
       Integer batchSize) throws FlumeException {
-    NettyAvroRpcClient client = new NettyAvroRpcClient(
-        new InetSocketAddress(hostname, port), batchSize);
-    return client;
 
+    if (hostname == null) {
+      throw new NullPointerException("hostname must not be null");
+    }
+    if (port == null) {
+      throw new NullPointerException("port must not be null");
+    }
+    if (batchSize == null) {
+      throw new NullPointerException("batchSize must not be null");
+    }
+
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+        hostname + ":" + port.intValue());
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
+    NettyAvroRpcClient client = new NettyAvroRpcClient();
+    client.configure(props);
+    return client;
   }
 
   public static enum ClientType {

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java Thu Jul  5 21:19:02 2012
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import junit.framework.Assert;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -99,8 +100,12 @@ public class RpcTestUtils {
    * Helper method for constructing a Netty RPC client that talks to localhost.
    */
   public static NettyAvroRpcClient getStockLocalClient(int port) {
-    NettyAvroRpcClient client =
-       new NettyAvroRpcClient(new InetSocketAddress("localhost", port), 0);
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+        "127.0.0.1" + ":" + port);
+    NettyAvroRpcClient client = new NettyAvroRpcClient();
+    client.configure(props);
 
     return client;
   }

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java?rev=1357932&r1=1357931&r2=1357932&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java Thu Jul  5 21:19:02 2012
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.junit.Test;
 
@@ -47,7 +48,7 @@ public class TestNettyAvroRpcClient {
   private static final Logger logger = LoggerFactory
       .getLogger(TestNettyAvroRpcClient.class);
 
-  private static final String localhost = "localhost";
+  private static final String localhost = "127.0.0.1";
 
   /**
    * Simple request
@@ -79,8 +80,12 @@ public class TestNettyAvroRpcClient {
   @Test(expected=FlumeException.class)
   public void testUnableToConnect() throws FlumeException {
     @SuppressWarnings("unused")
-    NettyAvroRpcClient client = new NettyAvroRpcClient(
-        new InetSocketAddress(localhost, 1), 0);
+    NettyAvroRpcClient client = new NettyAvroRpcClient();
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost",
+        localhost + ":" + 1);
+    client.configure(props);
   }
 
   /**
@@ -95,9 +100,14 @@ public class TestNettyAvroRpcClient {
     int moreThanBatchSize = batchSize + 1;
     NettyAvroRpcClient client = null;
     Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost",
+        localhost + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "" + batchSize);
     try {
-      client = new NettyAvroRpcClient(
-          new InetSocketAddress(localhost, server.getPort()), batchSize);
+      client = new NettyAvroRpcClient();
+      client.configure(props);
 
       // send one more than the batch size
       List<Event> events = new ArrayList<Event>();