You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/13 03:25:32 UTC

svn commit: r1299958 [2/2] - in /incubator/flume/trunk: ./ flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-jdbc-channel/ flume-ng-clients/flume-ng-log4jappender/ flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clien...

Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.junit.Test;
+
+import org.apache.avro.ipc.Server;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+
+import org.apache.flume.api.RpcTestUtils.FailedAvroHandler;
+import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
+import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler;
+import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class TestNettyAvroRpcClient {
+
+  private static final Logger logger =
+      Logger.getLogger(TestNettyAvroRpcClient.class.getName());
+
+  private static final String localhost = "localhost";
+
+  /**
+   * Simple request
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testOKServerSimple() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler());
+  }
+
+  /**
+   * Simple batch request
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testOKServerBatch() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler());
+  }
+
+  /**
+   * Try to connect to a closed port.
+   * Note: this test tries to connect to port 1 on localhost.
+   * @throws FlumeException
+   */
+  @Test(expected=FlumeException.class)
+  public void testUnableToConnect() throws FlumeException {
+    NettyAvroRpcClient client = new NettyAvroRpcClient.Builder()
+        .hostname(localhost).port(1).build();
+  }
+
+  /**
+   * Send too many events at once. Should handle this case gracefully.
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testBatchOverrun() throws FlumeException, EventDeliveryException {
+
+    int batchSize = 10;
+    int moreThanBatchSize = batchSize + 1;
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      client = new NettyAvroRpcClient.Builder()
+          .hostname(localhost).port(server.getPort()).batchSize(batchSize)
+          .build();
+
+      // send one more than the batch size
+      List<Event> events = new ArrayList<Event>();
+      for (int i = 0; i < moreThanBatchSize; i++) {
+        events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+      }
+      client.appendBatch(events);
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  /**
+   * First connect the client, then shut down the server, then send a request.
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testServerDisconnect() throws FlumeException,
+      EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      client = RpcTestUtils.getStockLocalClient(server.getPort());
+      server.close();
+      try {
+        server.join();
+      } catch (InterruptedException ex) {
+        logger.log(Level.WARNING, "Thread interrupted during join()", ex);
+        Thread.currentThread().interrupt();
+      }
+      try {
+        client.append(EventBuilder.withBody("hello", Charset.forName("UTF8")));
+      } finally {
+        Assert.assertFalse("Client should not be active", client.isActive());
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  /**
+   * First connect the client, then close the client, then send a request.
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testClientClosedRequest() throws FlumeException,
+      EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      client = RpcTestUtils.getStockLocalClient(server.getPort());
+      client.close();
+      Assert.assertFalse("Client should not be active", client.isActive());
+      System.out.println("Yaya! I am not active after client close!");
+      client.append(EventBuilder.withBody("hello", Charset.forName("UTF8")));
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  /**
+   * Send an event to an online server that returns FAILED.
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testFailedServerSimple() throws FlumeException,
+      EventDeliveryException {
+
+    RpcTestUtils.handlerSimpleAppendTest(new FailedAvroHandler());
+    logger.severe("Failed: I should never have gotten here!");
+  }
+
+  /**
+   * Send an event to an online server that returns UNKNOWN.
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testUnknownServerSimple() throws FlumeException,
+      EventDeliveryException {
+
+    RpcTestUtils.handlerSimpleAppendTest(new UnknownAvroHandler());
+    logger.severe("Unknown: I should never have gotten here!");
+  }
+
+  /**
+   * Send an event to an online server that throws an exception.
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testThrowingServerSimple() throws FlumeException,
+      EventDeliveryException {
+
+    RpcTestUtils.handlerSimpleAppendTest(new ThrowingAvroHandler());
+    logger.severe("Throwing: I should never have gotten here!");
+  }
+
+  /**
+   * Send a batch of events to a server that returns FAILED.
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testFailedServerBatch() throws FlumeException,
+      EventDeliveryException {
+
+    RpcTestUtils.handlerBatchAppendTest(new FailedAvroHandler());
+    logger.severe("Failed: I should never have gotten here!");
+  }
+
+  /**
+   * Send a batch of events to a server that returns UNKNOWN.
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testUnknownServerBatch() throws FlumeException,
+      EventDeliveryException {
+
+    RpcTestUtils.handlerBatchAppendTest(new UnknownAvroHandler());
+    logger.severe("Unknown: I should never have gotten here!");
+  }
+
+  /**
+   * Send a batch of events to a server that always throws exceptions.
+   */
+  @Test(expected=EventDeliveryException.class)
+  public void testThrowingServerBatch() throws FlumeException,
+      EventDeliveryException {
+
+    RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler());
+    logger.severe("Throwing: I should never have gotten here!");
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.ipc.Server;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
+import org.junit.Test;
+
+import org.apache.flume.event.EventBuilder;
+
+/**
+ * Very light testing on the factory. The heavy testing is done on the test
+ * dedicated to the implementation.
+ */
+public class TestRpcClientFactory {
+
+  private static final String localhost = "localhost";
+
+  @Test
+  public void testTwoParamSimpleAppend() throws FlumeException,
+      EventDeliveryException {
+    RpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      client = RpcClientFactory.getInstance(localhost, server.getPort());
+      client.append(EventBuilder.withBody("wheee!!!", Charset.forName("UTF8")));
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  @Test
+  public void testThreeParamBatchAppend() throws FlumeException,
+      EventDeliveryException {
+    int batchSize = 7;
+    RpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      client = RpcClientFactory.getInstance(localhost, server.getPort(),
+          batchSize);
+
+      List<Event> events = new ArrayList<Event>();
+      for (int i = 0; i < batchSize; i++) {
+        events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+      }
+      client.appendBatch(events);
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  // we are supposed to handle this gracefully
+  @Test
+  public void testTwoParamBatchAppendOverflow() throws FlumeException,
+      EventDeliveryException {
+    RpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      client = RpcClientFactory.getInstance(localhost, server.getPort());
+      int batchSize = client.getBatchSize();
+      int moreThanBatch = batchSize + 1;
+      List<Event> events = new ArrayList<Event>();
+      for (int i = 0; i < moreThanBatch; i++) {
+        events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+      }
+      client.appendBatch(events);
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEventBuilder {
+
+  @Test
+  public void testBody() {
+    Event e1 = EventBuilder.withBody("e1".getBytes());
+    Assert.assertNotNull(e1);
+    Assert.assertArrayEquals("body is correct", "e1".getBytes(), e1.getBody());
+
+    Event e2 = EventBuilder.withBody(Long.valueOf(2).toString().getBytes());
+    Assert.assertNotNull(e2);
+    Assert.assertArrayEquals("body is correct", Long.valueOf(2L).toString()
+        .getBytes(), e2.getBody());
+  }
+
+  @Test
+  public void testHeaders() {
+    Map<String, String> headers = new HashMap<String, String>();
+
+    headers.put("one", "1");
+    headers.put("two", "2");
+
+    Event e1 = EventBuilder.withBody("e1".getBytes(), headers);
+
+    Assert.assertNotNull(e1);
+    Assert.assertArrayEquals("e1 has the proper body", "e1".getBytes(),
+        e1.getBody());
+    Assert.assertEquals("e1 has the proper headers", 2, e1.getHeaders().size());
+    Assert.assertEquals("e1 has a one key", "1", e1.getHeaders().get("one"));
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml Tue Mar 13 02:25:30 2012
@@ -43,6 +43,11 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
 

Modified: incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml Tue Mar 13 02:25:30 2012
@@ -43,6 +43,11 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
 

Modified: incubator/flume/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/pom.xml (original)
+++ incubator/flume/trunk/pom.xml Tue Mar 13 02:25:30 2012
@@ -49,6 +49,7 @@ limitations under the License.
     <module>flume-ng-channels</module>
     <module>flume-ng-legacy-sources</module>
     <module>flume-ng-clients</module>
+    <module>flume-ng-sdk</module>
   </modules>
 
   <profiles>
@@ -436,7 +437,10 @@ limitations under the License.
         <plugin>
           <groupId>org.apache.avro</groupId>
           <artifactId>avro-maven-plugin</artifactId>
-          <version>1.6.1</version>
+          <configuration>
+            <stringType>String</stringType>
+          </configuration>
+          <version>1.6.2</version>
         </plugin>
 
         <plugin>
@@ -531,19 +535,19 @@ limitations under the License.
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
-        <version>1.6.1</version>
+        <version>1.6.2</version>
       </dependency>
 
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-compiler</artifactId>
-        <version>1.6.1</version>
+        <version>1.6.2</version>
       </dependency>
 
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-ipc</artifactId>
-        <version>1.6.1</version>
+        <version>1.6.2</version>
       </dependency>
 
       <dependency>
@@ -644,6 +648,12 @@ limitations under the License.
         <version>1.1.0-incubating-SNAPSHOT</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-sdk</artifactId>
+        <version>1.1.0-incubating-SNAPSHOT</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>