You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/02/11 19:55:40 UTC

git commit: FLUME-1898: Implement Thrift Source

Updated Branches:
  refs/heads/trunk 60da3d860 -> c35b7c947


FLUME-1898: Implement Thrift Source

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c35b7c94
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c35b7c94
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c35b7c94

Branch: refs/heads/trunk
Commit: c35b7c947915f7bce4da0b00938ec777d45fee31
Parents: 60da3d8
Author: Brock Noland <br...@apache.org>
Authored: Mon Feb 11 12:55:15 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Mon Feb 11 12:55:15 2013 -0600

----------------------------------------------------------------------
 .../flume/conf/source/SourceConfiguration.java     |    9 +-
 .../org/apache/flume/conf/source/SourceType.java   |    9 +-
 flume-ng-core/pom.xml                              |   12 +
 .../java/org/apache/flume/source/ThriftSource.java |  219 ++++++++++++
 .../org/apache/flume/source/TestThriftSource.java  |  276 +++++++++++++++
 5 files changed, 523 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
index 3312b04..7029615 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
@@ -193,7 +193,14 @@ public class SourceConfiguration extends ComponentConfiguration {
      *
      * @see org.apache.flume.source.http.HTTPSource
      */
-    HTTP("org.apache.flume.source.http.HTTPSourceConfiguration");
+    HTTP("org.apache.flume.source.http.HTTPSourceConfiguration"),
+
+    /**
+     * HTTP Source
+     *
+     * @see org.apache.flume.source.ThriftSource
+     */
+    THRIFT("org.apache.flume.source.http.ThriftSourceConfiguration");
 
     private String srcConfigurationName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
index 058ca1c..a1bcd58 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
@@ -89,7 +89,14 @@ public enum SourceType {
    *
    * @see org.apache.flume.source.http.HTTPSource
    */
-  HTTP("org.apache.flume.source.http.HTTPSource");
+  HTTP("org.apache.flume.source.http.HTTPSource"),
+
+  /**
+   * Spool directory source
+   *
+   * @see org.apache.flume.source.ThriftSource
+   */
+  THRIFT("org.apache.flume.source.ThriftSource");
 
   private final String sourceClassName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index ba414bc..fa3999d 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -139,6 +139,13 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-configuration</artifactId>
     </dependency>
 
@@ -242,6 +249,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.mina</groupId>
       <artifactId>mina-core</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
new file mode 100644
index 0000000..979fd35
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.thrift.Status;
+import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.flume.thrift.ThriftFlumeEvent;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public class ThriftSource extends AbstractSource implements Configurable,
+  EventDrivenSource {
+
+  public static final Logger logger = LoggerFactory.getLogger(ThriftSource
+    .class);
+  /**
+   * Config param for the maximum number of threads this source should use to
+   * handle incoming data.
+   */
+  public static final String CONFIG_THREADS = "threads";
+  /**
+   * Config param for the hostname to listen on.
+   */
+  public static final String CONFIG_BIND = "bind";
+  /**
+   * Config param for the port to listen on.
+   */
+  public static final String CONFIG_PORT = "port";
+  private Integer port;
+  private String bindAddress;
+  private int maxThreads = 0;
+  private SourceCounter sourceCounter;
+  private TServer server;
+  private TNonblockingServerSocket serverTransport;
+  private ExecutorService servingExecutor;
+
+  @Override
+  public void configure(Context context) {
+    logger.info("Configuring thrift source.");
+    port = context.getInteger(CONFIG_PORT);
+    Preconditions.checkNotNull(port, "Port must be specified for Thrift " +
+      "Source.");
+    bindAddress = context.getString(CONFIG_BIND);
+    Preconditions.checkNotNull(bindAddress, "Bind address must be specified " +
+      "for Thrift Source.");
+
+    try {
+      maxThreads = context.getInteger(CONFIG_THREADS, 0);
+    } catch (NumberFormatException e) {
+      logger.warn("Thrift source\'s \"threads\" property must specify an " +
+        "integer value: " + context.getString(CONFIG_THREADS));
+    }
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
+  }
+
+  @Override
+  public void start() {
+    logger.info("Starting thrift source");
+    ExecutorService sourceService;
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
+      "Flume Thrift IPC Thread %d").build();
+    if (maxThreads == 0) {
+      sourceService = Executors.newCachedThreadPool(threadFactory);
+    } else {
+      sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
+    }
+    try {
+      serverTransport = new TNonblockingServerSocket(new InetSocketAddress
+        (bindAddress, port));
+    } catch (TTransportException e) {
+      throw new FlumeException("Failed to start Thrift Source.", e);
+    }
+    server = new TThreadedSelectorServer(
+      new TThreadedSelectorServer.Args(serverTransport).protocolFactory(
+        new TCompactProtocol.Factory()).processor(
+        new ThriftSourceProtocol.Processor<ThriftSourceHandler>(
+          new ThriftSourceHandler())).executorService(sourceService));
+
+    servingExecutor = Executors.newSingleThreadExecutor(new
+      ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss")
+      .build());
+    /**
+     * Start serving.
+     */
+    servingExecutor.submit(new Runnable() {
+      @Override
+      public void run() {
+        server.serve();
+      }
+    });
+
+    long timeAfterStart = System.currentTimeMillis();
+    while(!server.isServing()) {
+      try {
+        if(System.currentTimeMillis() - timeAfterStart >=10000) {
+          throw new FlumeException("Thrift server failed to start!");
+        }
+        TimeUnit.MILLISECONDS.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new FlumeException("Interrupted while waiting for Thrift server" +
+          " to start.", e);
+      }
+    }
+    sourceCounter.start();
+    logger.info("Started Thrift source.");
+    super.start();
+  }
+
+  public void stop() {
+    if(server != null && server.isServing()) {
+      server.stop();
+    }
+    servingExecutor.shutdown();
+    try {
+      if(!servingExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        servingExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      throw new FlumeException("Interrupted while waiting for server to be " +
+        "shutdown.");
+    }
+    sourceCounter.stop();
+    // Thrift will shutdown the executor passed to it.
+    super.stop();
+  }
+
+  private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
+
+    @Override
+    public Status append(ThriftFlumeEvent event) throws TException {
+      Event flumeEvent = EventBuilder.withBody(event.getBody(),
+        event.getHeaders());
+
+      sourceCounter.incrementAppendReceivedCount();
+      sourceCounter.incrementEventReceivedCount();
+
+      try {
+        getChannelProcessor().processEvent(flumeEvent);
+      } catch (ChannelException ex) {
+        logger.warn("Thrift source " + getName() + " could not append events " +
+          "to the channel.", ex);
+        return Status.FAILED;
+      }
+      sourceCounter.incrementAppendAcceptedCount();
+      sourceCounter.incrementEventAcceptedCount();
+      return Status.OK;
+    }
+
+    @Override
+    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
+      sourceCounter.incrementAppendBatchReceivedCount();
+      sourceCounter.addToEventReceivedCount(events.size());
+
+      List<Event> flumeEvents = Lists.newArrayList();
+      for(ThriftFlumeEvent event : events) {
+        flumeEvents.add(EventBuilder.withBody(event.getBody(),
+          event.getHeaders()));
+      }
+
+      try {
+        getChannelProcessor().processEventBatch(flumeEvents);
+      } catch (ChannelException ex) {
+        logger.warn("Thrift source %s could not append events to the " +
+          "channel.", getName());
+        return Status.FAILED;
+      }
+
+      sourceCounter.incrementAppendBatchAcceptedCount();
+      sourceCounter.addToEventAcceptedCount(events.size());
+      return Status.OK;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
new file mode 100644
index 0000000..357965f
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TestThriftSource {
+
+  private ThriftSource source;
+  private MemoryChannel channel;
+  private RpcClient client;
+  private final Random random = new Random();
+  private final Properties props = new Properties();
+  private int port;
+
+  @Before
+  public void setUp() {
+    port = random.nextInt(50000) + 1024;
+    props.clear();
+    props.setProperty("hosts", "h1");
+    props.setProperty("hosts.h1", "0.0.0.0:"+ String.valueOf(port));
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+      "2000");
+    channel = new MemoryChannel();
+    source = new ThriftSource();
+  }
+
+  @After
+  public void stop() throws Exception {
+    source.stop();
+  }
+
+  private void configureSource() {
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+  }
+
+
+  @Test
+  public void testAppend() throws Exception {
+    client = RpcClientFactory.getThriftInstance(props);
+    Context context = new Context();
+    channel.configure(context);
+    configureSource();
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    Configurables.configure(source, context);
+    source.start();
+    for(int i = 0; i < 30; i++) {
+      client.append(EventBuilder.withBody(String.valueOf(i).getBytes()));
+    }
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    for (int i = 0; i < 30; i++) {
+      Event event = channel.take();
+      Assert.assertNotNull(event);
+      Assert.assertEquals(String.valueOf(i), new String(event.getBody()));
+    }
+    transaction.commit();
+    transaction.close();
+  }
+
+  @Test
+  public void testAppendBatch() throws Exception {
+    client = RpcClientFactory.getThriftInstance(props);
+    Context context = new Context();
+    context.put("capacity", "1000");
+    context.put("transactionCapacity", "1000");
+    channel.configure(context);
+    configureSource();
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    Configurables.configure(source, context);
+    source.start();
+    for (int i = 0; i < 30; i++) {
+      List<Event> events = Lists.newArrayList();
+      for (int j = 0; j < 10; j++) {
+        Map<String, String> hdrs = Maps.newHashMap();
+        hdrs.put("time", String.valueOf(System.currentTimeMillis()));
+        events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs));
+      }
+      client.appendBatch(events);
+    }
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    long after = System.currentTimeMillis();
+    List<Integer> events = Lists.newArrayList();
+    for (int i = 0; i < 300; i++) {
+      Event event = channel.take();
+      Assert.assertNotNull(event);
+      Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after);
+      events.add(Integer.parseInt(new String(event.getBody())));
+    }
+    transaction.commit();
+    transaction.close();
+
+    Collections.sort(events);
+
+    int index = 0;
+    //30 batches of 10
+    for(int i = 0; i < 30; i++) {
+      for(int j = 0; j < 10; j++) {
+        Assert.assertEquals(i, events.get(index++).intValue());
+      }
+    }
+  }
+
+  @Test
+  public void testAppendBigBatch() throws Exception {
+    client = RpcClientFactory.getThriftInstance(props);
+    Context context = new Context();
+    context.put("capacity", "3000");
+    context.put("transactionCapacity", "3000");
+    channel.configure(context);
+    configureSource();
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    Configurables.configure(source, context);
+    source.start();
+    for (int i = 0; i < 5; i++) {
+      List<Event> events = Lists.newArrayList();
+      for (int j = 0; j < 500; j++) {
+        Map<String, String> hdrs = Maps.newHashMap();
+        hdrs.put("time", String.valueOf(System.currentTimeMillis()));
+        events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs));
+      }
+      client.appendBatch(events);
+    }
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    long after = System.currentTimeMillis();
+    List<Integer> events = Lists.newArrayList();
+    for (int i = 0; i < 2500; i++) {
+      Event event = channel.take();
+      Assert.assertNotNull(event);
+      Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after);
+      events.add(Integer.parseInt(new String(event.getBody())));
+    }
+    transaction.commit();
+    transaction.close();
+
+    Collections.sort(events);
+
+    int index = 0;
+    //10 batches of 500
+    for(int i = 0; i < 5; i++) {
+      for(int j = 0; j < 500; j++) {
+        Assert.assertEquals(i, events.get(index++).intValue());
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleClients() throws Exception {
+    ExecutorService submitter = Executors.newCachedThreadPool();
+    client = RpcClientFactory.getThriftInstance(props);
+    Context context = new Context();
+    context.put("capacity", "1000");
+    context.put("transactionCapacity", "1000");
+    channel.configure(context);
+    configureSource();
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    Configurables.configure(source, context);
+    source.start();
+    ExecutorCompletionService<Void> completionService = new
+      ExecutorCompletionService(submitter);
+    for (int i = 0; i < 30; i++) {
+      completionService.submit(new SubmitHelper(i), null);
+    }
+    //wait for all threads to be done
+
+
+    for(int i = 0; i < 30; i++) {
+      completionService.take();
+    }
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    long after = System.currentTimeMillis();
+    List<Integer> events = Lists.newArrayList();
+    for (int i = 0; i < 300; i++) {
+      Event event = channel.take();
+      Assert.assertNotNull(event);
+      Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after);
+      events.add(Integer.parseInt(new String(event.getBody())));
+    }
+    transaction.commit();
+    transaction.close();
+
+    Collections.sort(events);
+
+    int index = 0;
+    //30 batches of 10
+    for(int i = 0; i < 30; i++) {
+      for(int j = 0; j < 10; j++) {
+        Assert.assertEquals(i, events.get(index++).intValue());
+      }
+    }
+  }
+
+  private class SubmitHelper implements Runnable {
+
+    private final int i;
+    public SubmitHelper(int i) {
+      this.i = i;
+    }
+    @Override
+    public void run() {
+      List<Event> events = Lists.newArrayList();
+      for (int j = 0; j < 10; j++) {
+        Map<String, String> hdrs = Maps.newHashMap();
+        hdrs.put("time", String.valueOf(System.currentTimeMillis()));
+        events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs));
+      }
+      try {
+        client.appendBatch(events);
+      } catch (EventDeliveryException e) {
+        throw new FlumeException(e);
+      }
+    }
+  }
+}