You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/02/11 19:55:40 UTC
git commit: FLUME-1898: Implement Thrift Source
Updated Branches:
refs/heads/trunk 60da3d860 -> c35b7c947
FLUME-1898: Implement Thrift Source
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c35b7c94
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c35b7c94
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c35b7c94
Branch: refs/heads/trunk
Commit: c35b7c947915f7bce4da0b00938ec777d45fee31
Parents: 60da3d8
Author: Brock Noland <br...@apache.org>
Authored: Mon Feb 11 12:55:15 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Mon Feb 11 12:55:15 2013 -0600
----------------------------------------------------------------------
.../flume/conf/source/SourceConfiguration.java | 9 +-
.../org/apache/flume/conf/source/SourceType.java | 9 +-
flume-ng-core/pom.xml | 12 +
.../java/org/apache/flume/source/ThriftSource.java | 219 ++++++++++++
.../org/apache/flume/source/TestThriftSource.java | 276 +++++++++++++++
5 files changed, 523 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
index 3312b04..7029615 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
@@ -193,7 +193,14 @@ public class SourceConfiguration extends ComponentConfiguration {
*
* @see org.apache.flume.source.http.HTTPSource
*/
- HTTP("org.apache.flume.source.http.HTTPSourceConfiguration");
+ HTTP("org.apache.flume.source.http.HTTPSourceConfiguration"),
+
+ /**
+ * HTTP Source
+ *
+ * @see org.apache.flume.source.ThriftSource
+ */
+ THRIFT("org.apache.flume.source.http.ThriftSourceConfiguration");
private String srcConfigurationName;
http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
index 058ca1c..a1bcd58 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
@@ -89,7 +89,14 @@ public enum SourceType {
*
* @see org.apache.flume.source.http.HTTPSource
*/
- HTTP("org.apache.flume.source.http.HTTPSource");
+ HTTP("org.apache.flume.source.http.HTTPSource"),
+
+ /**
+ * Spool directory source
+ *
+ * @see org.apache.flume.source.ThriftSource
+ */
+ THRIFT("org.apache.flume.source.ThriftSource");
private final String sourceClassName;
http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index ba414bc..fa3999d 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -139,6 +139,13 @@ limitations under the License.
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
</dependency>
@@ -242,6 +249,11 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
new file mode 100644
index 0000000..979fd35
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.thrift.Status;
+import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.flume.thrift.ThriftFlumeEvent;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public class ThriftSource extends AbstractSource implements Configurable,
+ EventDrivenSource {
+
+ public static final Logger logger = LoggerFactory.getLogger(ThriftSource
+ .class);
+ /**
+ * Config param for the maximum number of threads this source should use to
+ * handle incoming data.
+ */
+ public static final String CONFIG_THREADS = "threads";
+ /**
+ * Config param for the hostname to listen on.
+ */
+ public static final String CONFIG_BIND = "bind";
+ /**
+ * Config param for the port to listen on.
+ */
+ public static final String CONFIG_PORT = "port";
+ private Integer port;
+ private String bindAddress;
+ private int maxThreads = 0;
+ private SourceCounter sourceCounter;
+ private TServer server;
+ private TNonblockingServerSocket serverTransport;
+ private ExecutorService servingExecutor;
+
+ @Override
+ public void configure(Context context) {
+ logger.info("Configuring thrift source.");
+ port = context.getInteger(CONFIG_PORT);
+ Preconditions.checkNotNull(port, "Port must be specified for Thrift " +
+ "Source.");
+ bindAddress = context.getString(CONFIG_BIND);
+ Preconditions.checkNotNull(bindAddress, "Bind address must be specified " +
+ "for Thrift Source.");
+
+ try {
+ maxThreads = context.getInteger(CONFIG_THREADS, 0);
+ } catch (NumberFormatException e) {
+ logger.warn("Thrift source\'s \"threads\" property must specify an " +
+ "integer value: " + context.getString(CONFIG_THREADS));
+ }
+
+ if (sourceCounter == null) {
+ sourceCounter = new SourceCounter(getName());
+ }
+ }
+
+ @Override
+ public void start() {
+ logger.info("Starting thrift source");
+ ExecutorService sourceService;
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
+ "Flume Thrift IPC Thread %d").build();
+ if (maxThreads == 0) {
+ sourceService = Executors.newCachedThreadPool(threadFactory);
+ } else {
+ sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
+ }
+ try {
+ serverTransport = new TNonblockingServerSocket(new InetSocketAddress
+ (bindAddress, port));
+ } catch (TTransportException e) {
+ throw new FlumeException("Failed to start Thrift Source.", e);
+ }
+ server = new TThreadedSelectorServer(
+ new TThreadedSelectorServer.Args(serverTransport).protocolFactory(
+ new TCompactProtocol.Factory()).processor(
+ new ThriftSourceProtocol.Processor<ThriftSourceHandler>(
+ new ThriftSourceHandler())).executorService(sourceService));
+
+ servingExecutor = Executors.newSingleThreadExecutor(new
+ ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss")
+ .build());
+ /**
+ * Start serving.
+ */
+ servingExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ server.serve();
+ }
+ });
+
+ long timeAfterStart = System.currentTimeMillis();
+ while(!server.isServing()) {
+ try {
+ if(System.currentTimeMillis() - timeAfterStart >=10000) {
+ throw new FlumeException("Thrift server failed to start!");
+ }
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new FlumeException("Interrupted while waiting for Thrift server" +
+ " to start.", e);
+ }
+ }
+ sourceCounter.start();
+ logger.info("Started Thrift source.");
+ super.start();
+ }
+
+ public void stop() {
+ if(server != null && server.isServing()) {
+ server.stop();
+ }
+ servingExecutor.shutdown();
+ try {
+ if(!servingExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ servingExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ throw new FlumeException("Interrupted while waiting for server to be " +
+ "shutdown.");
+ }
+ sourceCounter.stop();
+ // Thrift will shutdown the executor passed to it.
+ super.stop();
+ }
+
+ private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
+
+ @Override
+ public Status append(ThriftFlumeEvent event) throws TException {
+ Event flumeEvent = EventBuilder.withBody(event.getBody(),
+ event.getHeaders());
+
+ sourceCounter.incrementAppendReceivedCount();
+ sourceCounter.incrementEventReceivedCount();
+
+ try {
+ getChannelProcessor().processEvent(flumeEvent);
+ } catch (ChannelException ex) {
+ logger.warn("Thrift source " + getName() + " could not append events " +
+ "to the channel.", ex);
+ return Status.FAILED;
+ }
+ sourceCounter.incrementAppendAcceptedCount();
+ sourceCounter.incrementEventAcceptedCount();
+ return Status.OK;
+ }
+
+ @Override
+ public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
+ sourceCounter.incrementAppendBatchReceivedCount();
+ sourceCounter.addToEventReceivedCount(events.size());
+
+ List<Event> flumeEvents = Lists.newArrayList();
+ for(ThriftFlumeEvent event : events) {
+ flumeEvents.add(EventBuilder.withBody(event.getBody(),
+ event.getHeaders()));
+ }
+
+ try {
+ getChannelProcessor().processEventBatch(flumeEvents);
+ } catch (ChannelException ex) {
+ logger.warn("Thrift source %s could not append events to the " +
+ "channel.", getName());
+ return Status.FAILED;
+ }
+
+ sourceCounter.incrementAppendBatchAcceptedCount();
+ sourceCounter.addToEventAcceptedCount(events.size());
+ return Status.OK;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
new file mode 100644
index 0000000..357965f
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TestThriftSource {
+
+ private ThriftSource source;
+ private MemoryChannel channel;
+ private RpcClient client;
+ private final Random random = new Random();
+ private final Properties props = new Properties();
+ private int port;
+
+ @Before
+ public void setUp() {
+ port = random.nextInt(50000) + 1024;
+ props.clear();
+ props.setProperty("hosts", "h1");
+ props.setProperty("hosts.h1", "0.0.0.0:"+ String.valueOf(port));
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+ "2000");
+ channel = new MemoryChannel();
+ source = new ThriftSource();
+ }
+
+ @After
+ public void stop() throws Exception {
+ source.stop();
+ }
+
+ private void configureSource() {
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(channels);
+
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+ }
+
+
+ @Test
+ public void testAppend() throws Exception {
+ client = RpcClientFactory.getThriftInstance(props);
+ Context context = new Context();
+ channel.configure(context);
+ configureSource();
+ context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+ context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+ Configurables.configure(source, context);
+ source.start();
+ for(int i = 0; i < 30; i++) {
+ client.append(EventBuilder.withBody(String.valueOf(i).getBytes()));
+ }
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ for (int i = 0; i < 30; i++) {
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertEquals(String.valueOf(i), new String(event.getBody()));
+ }
+ transaction.commit();
+ transaction.close();
+ }
+
+ @Test
+ public void testAppendBatch() throws Exception {
+ client = RpcClientFactory.getThriftInstance(props);
+ Context context = new Context();
+ context.put("capacity", "1000");
+ context.put("transactionCapacity", "1000");
+ channel.configure(context);
+ configureSource();
+ context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+ context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+ Configurables.configure(source, context);
+ source.start();
+ for (int i = 0; i < 30; i++) {
+ List<Event> events = Lists.newArrayList();
+ for (int j = 0; j < 10; j++) {
+ Map<String, String> hdrs = Maps.newHashMap();
+ hdrs.put("time", String.valueOf(System.currentTimeMillis()));
+ events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs));
+ }
+ client.appendBatch(events);
+ }
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ long after = System.currentTimeMillis();
+ List<Integer> events = Lists.newArrayList();
+ for (int i = 0; i < 300; i++) {
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after);
+ events.add(Integer.parseInt(new String(event.getBody())));
+ }
+ transaction.commit();
+ transaction.close();
+
+ Collections.sort(events);
+
+ int index = 0;
+ //30 batches of 10
+ for(int i = 0; i < 30; i++) {
+ for(int j = 0; j < 10; j++) {
+ Assert.assertEquals(i, events.get(index++).intValue());
+ }
+ }
+ }
+
+ @Test
+ public void testAppendBigBatch() throws Exception {
+ client = RpcClientFactory.getThriftInstance(props);
+ Context context = new Context();
+ context.put("capacity", "3000");
+ context.put("transactionCapacity", "3000");
+ channel.configure(context);
+ configureSource();
+ context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+ context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+ Configurables.configure(source, context);
+ source.start();
+ for (int i = 0; i < 5; i++) {
+ List<Event> events = Lists.newArrayList();
+ for (int j = 0; j < 500; j++) {
+ Map<String, String> hdrs = Maps.newHashMap();
+ hdrs.put("time", String.valueOf(System.currentTimeMillis()));
+ events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs));
+ }
+ client.appendBatch(events);
+ }
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ long after = System.currentTimeMillis();
+ List<Integer> events = Lists.newArrayList();
+ for (int i = 0; i < 2500; i++) {
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after);
+ events.add(Integer.parseInt(new String(event.getBody())));
+ }
+ transaction.commit();
+ transaction.close();
+
+ Collections.sort(events);
+
+ int index = 0;
+ //10 batches of 500
+ for(int i = 0; i < 5; i++) {
+ for(int j = 0; j < 500; j++) {
+ Assert.assertEquals(i, events.get(index++).intValue());
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleClients() throws Exception {
+ ExecutorService submitter = Executors.newCachedThreadPool();
+ client = RpcClientFactory.getThriftInstance(props);
+ Context context = new Context();
+ context.put("capacity", "1000");
+ context.put("transactionCapacity", "1000");
+ channel.configure(context);
+ configureSource();
+ context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+ context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+ Configurables.configure(source, context);
+ source.start();
+ ExecutorCompletionService<Void> completionService = new
+ ExecutorCompletionService(submitter);
+ for (int i = 0; i < 30; i++) {
+ completionService.submit(new SubmitHelper(i), null);
+ }
+ //wait for all threads to be done
+
+
+ for(int i = 0; i < 30; i++) {
+ completionService.take();
+ }
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ long after = System.currentTimeMillis();
+ List<Integer> events = Lists.newArrayList();
+ for (int i = 0; i < 300; i++) {
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after);
+ events.add(Integer.parseInt(new String(event.getBody())));
+ }
+ transaction.commit();
+ transaction.close();
+
+ Collections.sort(events);
+
+ int index = 0;
+ //30 batches of 10
+ for(int i = 0; i < 30; i++) {
+ for(int j = 0; j < 10; j++) {
+ Assert.assertEquals(i, events.get(index++).intValue());
+ }
+ }
+ }
+
+ private class SubmitHelper implements Runnable {
+
+ private final int i;
+ public SubmitHelper(int i) {
+ this.i = i;
+ }
+ @Override
+ public void run() {
+ List<Event> events = Lists.newArrayList();
+ for (int j = 0; j < 10; j++) {
+ Map<String, String> hdrs = Maps.newHashMap();
+ hdrs.put("time", String.valueOf(System.currentTimeMillis()));
+ events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs));
+ }
+ try {
+ client.appendBatch(events);
+ } catch (EventDeliveryException e) {
+ throw new FlumeException(e);
+ }
+ }
+ }
+}