You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/13 03:25:32 UTC
svn commit: r1299958 [2/2] - in /incubator/flume/trunk: ./
flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-jdbc-channel/
flume-ng-clients/flume-ng-log4jappender/
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clien...
Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.junit.Test;
+
+import org.apache.avro.ipc.Server;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+
+import org.apache.flume.api.RpcTestUtils.FailedAvroHandler;
+import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
+import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler;
+import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class TestNettyAvroRpcClient {
+
+ private static final Logger logger =
+ Logger.getLogger(TestNettyAvroRpcClient.class.getName());
+
+ private static final String localhost = "localhost";
+
+ /**
+ * Simple request
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testOKServerSimple() throws FlumeException,
+ EventDeliveryException {
+ RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler());
+ }
+
+ /**
+ * Simple batch request
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testOKServerBatch() throws FlumeException,
+ EventDeliveryException {
+ RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler());
+ }
+
+ /**
+ * Try to connect to a closed port.
+ * Note: this test tries to connect to port 1 on localhost.
+ * @throws FlumeException
+ */
+ @Test(expected=FlumeException.class)
+ public void testUnableToConnect() throws FlumeException {
+ NettyAvroRpcClient client = new NettyAvroRpcClient.Builder()
+ .hostname(localhost).port(1).build();
+ }
+
+ /**
+ * Send too many events at once. Should handle this case gracefully.
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testBatchOverrun() throws FlumeException, EventDeliveryException {
+
+ int batchSize = 10;
+ int moreThanBatchSize = batchSize + 1;
+ NettyAvroRpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ try {
+ client = new NettyAvroRpcClient.Builder()
+ .hostname(localhost).port(server.getPort()).batchSize(batchSize)
+ .build();
+
+ // send one more than the batch size
+ List<Event> events = new ArrayList<Event>();
+ for (int i = 0; i < moreThanBatchSize; i++) {
+ events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+ }
+ client.appendBatch(events);
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ /**
+ * First connect the client, then shut down the server, then send a request.
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testServerDisconnect() throws FlumeException,
+ EventDeliveryException {
+ NettyAvroRpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ try {
+ client = RpcTestUtils.getStockLocalClient(server.getPort());
+ server.close();
+ try {
+ server.join();
+ } catch (InterruptedException ex) {
+ logger.log(Level.WARNING, "Thread interrupted during join()", ex);
+ Thread.currentThread().interrupt();
+ }
+ try {
+ client.append(EventBuilder.withBody("hello", Charset.forName("UTF8")));
+ } finally {
+ Assert.assertFalse("Client should not be active", client.isActive());
+ }
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ /**
+ * First connect the client, then close the client, then send a request.
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testClientClosedRequest() throws FlumeException,
+ EventDeliveryException {
+ NettyAvroRpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ try {
+ client = RpcTestUtils.getStockLocalClient(server.getPort());
+ client.close();
+ Assert.assertFalse("Client should not be active", client.isActive());
+ System.out.println("Yaya! I am not active after client close!");
+ client.append(EventBuilder.withBody("hello", Charset.forName("UTF8")));
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ /**
+ * Send an event to an online server that returns FAILED.
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testFailedServerSimple() throws FlumeException,
+ EventDeliveryException {
+
+ RpcTestUtils.handlerSimpleAppendTest(new FailedAvroHandler());
+ logger.severe("Failed: I should never have gotten here!");
+ }
+
+ /**
+ * Send an event to an online server that returns UNKNOWN.
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testUnknownServerSimple() throws FlumeException,
+ EventDeliveryException {
+
+ RpcTestUtils.handlerSimpleAppendTest(new UnknownAvroHandler());
+ logger.severe("Unknown: I should never have gotten here!");
+ }
+
+ /**
+ * Send an event to an online server that throws an exception.
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testThrowingServerSimple() throws FlumeException,
+ EventDeliveryException {
+
+ RpcTestUtils.handlerSimpleAppendTest(new ThrowingAvroHandler());
+ logger.severe("Throwing: I should never have gotten here!");
+ }
+
+ /**
+ * Send a batch of events to a server that returns FAILED.
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testFailedServerBatch() throws FlumeException,
+ EventDeliveryException {
+
+ RpcTestUtils.handlerBatchAppendTest(new FailedAvroHandler());
+ logger.severe("Failed: I should never have gotten here!");
+ }
+
+ /**
+ * Send a batch of events to a server that returns UNKNOWN.
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testUnknownServerBatch() throws FlumeException,
+ EventDeliveryException {
+
+ RpcTestUtils.handlerBatchAppendTest(new UnknownAvroHandler());
+ logger.severe("Unknown: I should never have gotten here!");
+ }
+
+ /**
+ * Send a batch of events to a server that always throws exceptions.
+ */
+ @Test(expected=EventDeliveryException.class)
+ public void testThrowingServerBatch() throws FlumeException,
+ EventDeliveryException {
+
+ RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler());
+ logger.severe("Throwing: I should never have gotten here!");
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.ipc.Server;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
+import org.junit.Test;
+
+import org.apache.flume.event.EventBuilder;
+
+/**
+ * Very light testing on the factory. The heavy testing is done on the test
+ * dedicated to the implementation.
+ */
+public class TestRpcClientFactory {
+
+ private static final String localhost = "localhost";
+
+ @Test
+ public void testTwoParamSimpleAppend() throws FlumeException,
+ EventDeliveryException {
+ RpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ try {
+ client = RpcClientFactory.getInstance(localhost, server.getPort());
+ client.append(EventBuilder.withBody("wheee!!!", Charset.forName("UTF8")));
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ @Test
+ public void testThreeParamBatchAppend() throws FlumeException,
+ EventDeliveryException {
+ int batchSize = 7;
+ RpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ try {
+ client = RpcClientFactory.getInstance(localhost, server.getPort(),
+ batchSize);
+
+ List<Event> events = new ArrayList<Event>();
+ for (int i = 0; i < batchSize; i++) {
+ events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+ }
+ client.appendBatch(events);
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ // we are supposed to handle this gracefully
+ @Test
+ public void testTwoParamBatchAppendOverflow() throws FlumeException,
+ EventDeliveryException {
+ RpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ try {
+ client = RpcClientFactory.getInstance(localhost, server.getPort());
+ int batchSize = client.getBatchSize();
+ int moreThanBatch = batchSize + 1;
+ List<Event> events = new ArrayList<Event>();
+ for (int i = 0; i < moreThanBatch; i++) {
+ events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+ }
+ client.appendBatch(events);
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEventBuilder {
+
+ @Test
+ public void testBody() {
+ Event e1 = EventBuilder.withBody("e1".getBytes());
+ Assert.assertNotNull(e1);
+ Assert.assertArrayEquals("body is correct", "e1".getBytes(), e1.getBody());
+
+ Event e2 = EventBuilder.withBody(Long.valueOf(2).toString().getBytes());
+ Assert.assertNotNull(e2);
+ Assert.assertArrayEquals("body is correct", Long.valueOf(2L).toString()
+ .getBytes(), e2.getBody());
+ }
+
+ @Test
+ public void testHeaders() {
+ Map<String, String> headers = new HashMap<String, String>();
+
+ headers.put("one", "1");
+ headers.put("two", "2");
+
+ Event e1 = EventBuilder.withBody("e1".getBytes(), headers);
+
+ Assert.assertNotNull(e1);
+ Assert.assertArrayEquals("e1 has the proper body", "e1".getBytes(),
+ e1.getBody());
+ Assert.assertEquals("e1 has the proper headers", 2, e1.getHeaders().size());
+ Assert.assertEquals("e1 has a one key", "1", e1.getHeaders().get("one"));
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml Tue Mar 13 02:25:30 2012
@@ -43,6 +43,11 @@ limitations under the License.
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
Modified: incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml Tue Mar 13 02:25:30 2012
@@ -43,6 +43,11 @@ limitations under the License.
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
Modified: incubator/flume/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/pom.xml (original)
+++ incubator/flume/trunk/pom.xml Tue Mar 13 02:25:30 2012
@@ -49,6 +49,7 @@ limitations under the License.
<module>flume-ng-channels</module>
<module>flume-ng-legacy-sources</module>
<module>flume-ng-clients</module>
+ <module>flume-ng-sdk</module>
</modules>
<profiles>
@@ -436,7 +437,10 @@ limitations under the License.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
- <version>1.6.1</version>
+ <configuration>
+ <stringType>String</stringType>
+ </configuration>
+ <version>1.6.2</version>
</plugin>
<plugin>
@@ -531,19 +535,19 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
- <version>1.6.1</version>
+ <version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
- <version>1.6.1</version>
+ <version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
- <version>1.6.1</version>
+ <version>1.6.2</version>
</dependency>
<dependency>
@@ -644,6 +648,12 @@ limitations under the License.
<version>1.1.0-incubating-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.1.0-incubating-SNAPSHOT</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>