You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:07 UTC

[09/13] apex-malhar git commit: Flume source

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/sink/Server.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/sink/Server.java b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
new file mode 100644
index 0000000..14d9ff4
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
@@ -0,0 +1,419 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.flume.discovery.Discovery;
+import com.datatorrent.flume.discovery.Discovery.Service;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.AbstractServer;
+import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>
+ * Server class.</p>
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ * @since 0.9.2
+ */
+public class Server extends AbstractServer
+{
+  private final String id;
+  private final Discovery<byte[]> discovery;
+  private final long acceptedTolerance;
+
+  public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance)
+  {
+    this.id = id;
+    this.discovery = discovery;
+    this.acceptedTolerance = acceptedTolerance;
+  }
+
+  @Override
+  public void handleException(Exception cce, EventLoop el)
+  {
+    logger.error("Server Error", cce);
+    Request r = new Request(Command.SERVER_ERROR, null)
+    {
+      @Override
+      public Slice getAddress()
+      {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public int getEventCount()
+      {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public int getIdleCount()
+      {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+    };
+    synchronized (requests) {
+      requests.add(r);
+    }
+  }
+
+  private final Service<byte[]> service = new Service<byte[]>()
+  {
+    @Override
+    public String getHost()
+    {
+      return ((InetSocketAddress)getServerAddress()).getHostName();
+    }
+
+    @Override
+    public int getPort()
+    {
+      return ((InetSocketAddress)getServerAddress()).getPort();
+    }
+
+    @Override
+    public byte[] getPayload()
+    {
+      return null;
+    }
+
+    @Override
+    public String getId()
+    {
+      return id;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" +
+          Arrays.toString(getPayload()) + '}';
+    }
+
+  };
+
+  @Override
+  public void unregistered(final SelectionKey key)
+  {
+    discovery.unadvertise(service);
+    super.unregistered(key);
+  }
+
+  @Override
+  public void registered(final SelectionKey key)
+  {
+    super.registered(key);
+    discovery.advertise(service);
+  }
+
+  public enum Command
+  {
+    ECHO((byte)0),
+    SEEK((byte)1),
+    COMMITTED((byte)2),
+    CHECKPOINTED((byte)3),
+    CONNECTED((byte)4),
+    DISCONNECTED((byte)5),
+    WINDOWED((byte)6),
+    SERVER_ERROR((byte)7);
+
+    Command(byte b)
+    {
+      this.ord = b;
+    }
+
+    public byte getOrdinal()
+    {
+      return ord;
+    }
+
+    public static Command getCommand(byte b)
+    {
+      Command c;
+      switch (b) {
+        case 0:
+          c = ECHO;
+          break;
+
+        case 1:
+          c = SEEK;
+          break;
+
+        case 2:
+          c = COMMITTED;
+          break;
+
+        case 3:
+          c = CHECKPOINTED;
+          break;
+
+        case 4:
+          c = CONNECTED;
+          break;
+
+        case 5:
+          c = DISCONNECTED;
+          break;
+
+        case 6:
+          c = WINDOWED;
+          break;
+
+        case 7:
+          c = SERVER_ERROR;
+          break;
+
+        default:
+          throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
+      }
+
+      assert (b == c.ord);
+      return c;
+    }
+
+    private final byte ord;
+  }
+
+  public final ArrayList<Request> requests = new ArrayList<Request>(4);
+
+  @Override
+  public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
+  {
+    Client lClient = new Client();
+    lClient.connected();
+    return lClient;
+  }
+
+  public class Client extends AbstractLengthPrependerClient
+  {
+
+    @Override
+    public void onMessage(byte[] buffer, int offset, int size)
+    {
+      if (size != Request.FIXED_SIZE) {
+        logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
+            key.channel());
+        return;
+      }
+
+      long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET);
+      if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) {
+        logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
+            key.channel());
+        return;
+      }
+
+      try {
+        if (Command.getCommand(buffer[offset]) == Command.ECHO) {
+          write(buffer, offset, size);
+          return;
+        }
+      } catch (IllegalArgumentException ex) {
+        logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size),
+            key.channel(), ex);
+        return;
+      }
+
+      Request r = Request.getRequest(buffer, offset, this);
+      synchronized (requests) {
+        requests.add(r);
+      }
+    }
+
+    @Override
+    public void disconnected()
+    {
+      synchronized (requests) {
+        requests.add(Request.getRequest(
+            new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
+      }
+      super.disconnected();
+    }
+
+    public boolean write(byte[] address, Slice event)
+    {
+      if (event.offset == 0 && event.length == event.buffer.length) {
+        return write(address, event.buffer);
+      }
+
+      // a better method would be to replace the write implementation and allow it to natively support writing slices
+      return write(address, event.toByteArray());
+    }
+
+  }
+
+  public abstract static class Request
+  {
+    public static final int FIXED_SIZE = 17;
+    public static final int TIME_OFFSET = 9;
+    public final Command type;
+    public final Client client;
+
+    public Request(Command type, Client client)
+    {
+      this.type = type;
+      this.client = client;
+    }
+
+    public abstract Slice getAddress();
+
+    public abstract int getEventCount();
+
+    public abstract int getIdleCount();
+
+    @Override
+    public String toString()
+    {
+      return "Request{" + "type=" + type + '}';
+    }
+
+    public static Request getRequest(final byte[] buffer, final int offset, Client client)
+    {
+      Command command = Command.getCommand(buffer[offset]);
+      switch (command) {
+        case WINDOWED:
+          return new Request(Command.WINDOWED, client)
+          {
+            final int eventCount;
+            final int idleCount;
+
+            {
+              eventCount = Server.readInt(buffer, offset + 1);
+              idleCount = Server.readInt(buffer, offset + 5);
+            }
+
+            @Override
+            public Slice getAddress()
+            {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getEventCount()
+            {
+              return eventCount;
+            }
+
+            @Override
+            public int getIdleCount()
+            {
+              return idleCount;
+            }
+
+            @Override
+            public String toString()
+            {
+              return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}';
+            }
+
+          };
+
+        default:
+          return new Request(command, client)
+          {
+            final Slice address;
+
+            {
+              address = new Slice(buffer, offset + 1, 8);
+            }
+
+            @Override
+            public Slice getAddress()
+            {
+              return address;
+            }
+
+            @Override
+            public int getEventCount()
+            {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getIdleCount()
+            {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public String toString()
+            {
+              return "Request{" + "type=" + type + ", address=" + address + '}';
+            }
+
+          };
+
+      }
+
+    }
+
+  }
+
+  public static int readInt(byte[] buffer, int offset)
+  {
+    return buffer[offset++] & 0xff
+           | (buffer[offset++] & 0xff) << 8
+           | (buffer[offset++] & 0xff) << 16
+           | (buffer[offset++] & 0xff) << 24;
+  }
+
+  public static void writeInt(byte[] buffer, int offset, int i)
+  {
+    buffer[offset++] = (byte)i;
+    buffer[offset++] = (byte)(i >>> 8);
+    buffer[offset++] = (byte)(i >>> 16);
+    buffer[offset++] = (byte)(i >>> 24);
+  }
+
+  public static long readLong(byte[] buffer, int offset)
+  {
+    return (long)buffer[offset++] & 0xff
+           | (long)(buffer[offset++] & 0xff) << 8
+           | (long)(buffer[offset++] & 0xff) << 16
+           | (long)(buffer[offset++] & 0xff) << 24
+           | (long)(buffer[offset++] & 0xff) << 32
+           | (long)(buffer[offset++] & 0xff) << 40
+           | (long)(buffer[offset++] & 0xff) << 48
+           | (long)(buffer[offset++] & 0xff) << 56;
+  }
+
+  public static void writeLong(byte[] buffer, int offset, long l)
+  {
+    buffer[offset++] = (byte)l;
+    buffer[offset++] = (byte)(l >>> 8);
+    buffer[offset++] = (byte)(l >>> 16);
+    buffer[offset++] = (byte)(l >>> 24);
+    buffer[offset++] = (byte)(l >>> 32);
+    buffer[offset++] = (byte)(l >>> 40);
+    buffer[offset++] = (byte)(l >>> 48);
+    buffer[offset++] = (byte)(l >>> 56);
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(Server.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
new file mode 100644
index 0000000..490ac35
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
@@ -0,0 +1,248 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.source;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * <p>TestSource class.</p>
+ *
+ * @since 0.9.4
+ */
+public class TestSource extends AbstractSource implements EventDrivenSource, Configurable
+{
+  public static final String SOURCE_FILE = "sourceFile";
+  public static final String LINE_NUMBER = "lineNumber";
+  public static final String RATE = "rate";
+  public static final String PERCENT_PAST_EVENTS = "percentPastEvents";
+  static byte FIELD_SEPARATOR = 1;
+  static int DEF_PERCENT_PAST_EVENTS = 5;
+  public Timer emitTimer;
+  @Nonnull
+  String filePath;
+  int rate;
+  int numberOfPastEvents;
+  transient List<Row> cache;
+  private transient int startIndex;
+  private transient Random random;
+  private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+  private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  public TestSource()
+  {
+    super();
+    this.rate = 2500;
+    this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25;
+    this.random = new Random();
+
+  }
+
+  @Override
+  public void configure(Context context)
+  {
+    filePath = context.getString(SOURCE_FILE);
+    rate = context.getInteger(RATE, rate);
+    int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath));
+    try {
+      BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
+      try {
+        buildCache(lineReader);
+      } finally {
+        lineReader.close();
+      }
+    } catch (FileNotFoundException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) {
+      numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size());
+    }
+  }
+
+  @Override
+  public void start()
+  {
+    super.start();
+    emitTimer = new Timer();
+
+    final ChannelProcessor channel = getChannelProcessor();
+    final int cacheSize = cache.size();
+    emitTimer.scheduleAtFixedRate(new TimerTask()
+    {
+      @Override
+      public void run()
+      {
+        int lastIndex = startIndex + rate;
+        if (lastIndex > cacheSize) {
+          lastIndex -= cacheSize;
+          processBatch(channel, cache.subList(startIndex, cacheSize));
+          startIndex = 0;
+          while (lastIndex > cacheSize) {
+            processBatch(channel, cache);
+            lastIndex -= cacheSize;
+          }
+          processBatch(channel, cache.subList(0, lastIndex));
+        } else {
+          processBatch(channel, cache.subList(startIndex, lastIndex));
+        }
+        startIndex = lastIndex;
+      }
+
+    }, 0, 1000);
+  }
+
+  private void processBatch(ChannelProcessor channelProcessor, List<Row> rows)
+  {
+    if (rows.isEmpty()) {
+      return;
+    }
+
+    int noise = random.nextInt(numberOfPastEvents + 1);
+    Set<Integer> pastIndices = Sets.newHashSet();
+    for (int i = 0; i < noise; i++) {
+      pastIndices.add(random.nextInt(rows.size()));
+    }
+
+    Calendar calendar = Calendar.getInstance();
+    long high = calendar.getTimeInMillis();
+    calendar.add(Calendar.DATE, -2);
+    long low = calendar.getTimeInMillis();
+
+
+
+    List<Event> events = Lists.newArrayList();
+    for (int i = 0; i < rows.size(); i++) {
+      Row eventRow = rows.get(i);
+      if (pastIndices.contains(i)) {
+        long pastTime = (long)((Math.random() * (high - low)) + low);
+        byte[] pastDateField = dateFormat.format(pastTime).getBytes();
+        byte[] pastTimeField = timeFormat.format(pastTime).getBytes();
+
+        System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length);
+        System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length);
+      } else {
+        calendar.setTimeInMillis(System.currentTimeMillis());
+        byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes();
+        byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes();
+
+        System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length);
+        System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length);
+      }
+
+      HashMap<String, String> headers = new HashMap<String, String>(2);
+      headers.put(SOURCE_FILE, filePath);
+      headers.put(LINE_NUMBER, String.valueOf(startIndex + i));
+      events.add(EventBuilder.withBody(eventRow.bytes, headers));
+    }
+    channelProcessor.processEventBatch(events);
+  }
+
+  @Override
+  public void stop()
+  {
+    emitTimer.cancel();
+    super.stop();
+  }
+
+  private void buildCache(BufferedReader lineReader) throws IOException
+  {
+    cache = Lists.newArrayListWithCapacity(rate);
+
+    String line;
+    while ((line = lineReader.readLine()) != null) {
+      byte[] row = line.getBytes();
+      Row eventRow = new Row(row);
+      final int rowsize = row.length;
+
+      /* guid */
+      int sliceLengh = -1;
+      while (++sliceLengh < rowsize) {
+        if (row[sliceLengh] == FIELD_SEPARATOR) {
+          break;
+        }
+      }
+      int recordStart = sliceLengh + 1;
+      int pointer = sliceLengh + 1;
+      while (pointer < rowsize) {
+        if (row[pointer++] == FIELD_SEPARATOR) {
+          eventRow.dateFieldStart = recordStart;
+          break;
+        }
+      }
+
+      /* lets parse the date */
+      int dateStart = pointer;
+      while (pointer < rowsize) {
+        if (row[pointer++] == FIELD_SEPARATOR) {
+          eventRow.timeFieldStart = dateStart;
+          break;
+        }
+      }
+
+      cache.add(eventRow);
+    }
+  }
+
+  private static class Row
+  {
+    final byte[] bytes;
+    int dateFieldStart;
+    int timeFieldStart;
+//    boolean past;
+
+    Row(byte[] bytes)
+    {
+      this.bytes = bytes;
+    }
+
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(TestSource.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
new file mode 100644
index 0000000..c416418
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>DebugWrapper class.</p>
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ * @since 0.9.4
+ */
+public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context>
+{
+  HDFSStorage storage = new HDFSStorage();
+
+  @Override
+  public byte[] store(Slice bytes)
+  {
+    byte[] ret = null;
+
+    try {
+      ret = storage.store(bytes);
+    } finally {
+      logger.debug("storage.store(new byte[]{{}});", bytes);
+    }
+
+    return ret;
+  }
+
+  @Override
+  public byte[] retrieve(byte[] identifier)
+  {
+    byte[] ret = null;
+
+    try {
+      ret = storage.retrieve(identifier);
+    } finally {
+      logger.debug("storage.retrieve(new byte[]{{}});", identifier);
+    }
+
+    return ret;
+  }
+
+  @Override
+  public byte[] retrieveNext()
+  {
+    byte[] ret = null;
+    try {
+      ret = storage.retrieveNext();
+    } finally {
+      logger.debug("storage.retrieveNext();");
+    }
+
+    return ret;
+  }
+
+  @Override
+  public void clean(byte[] identifier)
+  {
+    try {
+      storage.clean(identifier);
+    } finally {
+      logger.debug("storage.clean(new byte[]{{}});", identifier);
+    }
+  }
+
+  @Override
+  public void flush()
+  {
+    try {
+      storage.flush();
+    } finally {
+      logger.debug("storage.flush();");
+    }
+  }
+
+  @Override
+  public void configure(Context cntxt)
+  {
+    try {
+      storage.configure(cntxt);
+    } finally {
+      logger.debug("storage.configure({});", cntxt);
+    }
+  }
+
+  @Override
+  public void setup(com.datatorrent.api.Context t1)
+  {
+    try {
+      storage.setup(t1);
+    } finally {
+      logger.debug("storage.setup({});", t1);
+    }
+
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      storage.teardown();
+    } finally {
+      logger.debug("storage.teardown();");
+    }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
new file mode 100644
index 0000000..59c7fd3
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>ErrorMaskingEventCodec class.</p>
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ * @since 1.0.4
+ */
+public class ErrorMaskingEventCodec extends EventCodec
+{
+
+  @Override
+  public Object fromByteArray(Slice fragment)
+  {
+    try {
+      return super.fromByteArray(fragment);
+    } catch (RuntimeException re) {
+      logger.warn("Cannot deserialize event {}", fragment, re);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Slice toByteArray(Event event)
+  {
+    try {
+      return super.toByteArray(event);
+    } catch (RuntimeException re) {
+      logger.warn("Cannot serialize event {}", event, re);
+    }
+
+    return null;
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
new file mode 100644
index 0000000..03d0d87
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>EventCodec class.</p>
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ * @since 0.9.4
+ */
+public class EventCodec implements StreamCodec<Event>
+{
+  private final transient Kryo kryo;
+
+  public EventCodec()
+  {
+    this.kryo = new Kryo();
+    this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+  }
+
+  @Override
+  public Object fromByteArray(Slice fragment)
+  {
+    ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
+    Input input = new Input(is);
+
+    @SuppressWarnings("unchecked")
+    HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class);
+    byte[] body = kryo.readObjectOrNull(input, byte[].class);
+    return EventBuilder.withBody(body, headers);
+  }
+
+  @Override
+  public Slice toByteArray(Event event)
+  {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Output output = new Output(os);
+
+    Map<String, String> headers = event.getHeaders();
+    if (headers != null && headers.getClass() != HashMap.class) {
+      HashMap<String, String> tmp = new HashMap<String, String>(headers.size());
+      tmp.putAll(headers);
+      headers = tmp;
+    }
+    kryo.writeObjectOrNull(output, headers, HashMap.class);
+    kryo.writeObjectOrNull(output, event.getBody(), byte[].class);
+    output.flush();
+    final byte[] bytes = os.toByteArray();
+    return new Slice(bytes, 0, bytes.length);
+  }
+
+  @Override
+  public int getPartition(Event o)
+  {
+    return o.hashCode();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(EventCodec.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/resources/flume-conf/flume-conf.sample.properties
----------------------------------------------------------------------
diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
new file mode 100644
index 0000000..9d3e430
--- /dev/null
+++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
@@ -0,0 +1,45 @@
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#agent1 on  node1
+ agent1.sources = netcatSource
+ agent1.channels = ch1
+ agent1.sinks = dt
+
+# first sink - dt
+ agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
+ agent1.sinks.dt.id = sink1
+ agent1.sinks.dt.hostname = localhost
+ agent1.sinks.dt.port = 8080
+ agent1.sinks.dt.sleepMillis = 7
+ agent1.sinks.dt.throughputAdjustmentFactor = 2
+ agent1.sinks.dt.maximumEventsPerTransaction = 5000
+ agent1.sinks.dt.minimumEventsPerTransaction = 1
+ agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
+ agent1.sinks.dt.storage.restore = false
+ agent1.sinks.dt.storage.baseDir = /tmp/flume101
+ agent1.sinks.dt.channel = ch1
+
+# channels
+ agent1.channels.ch1.type = file
+ agent1.channels.ch1.capacity = 10000000
+ agent1.channels.ch1.transactionCapacity = 10000
+ agent1.channels.ch1.maxFileSize = 67108864
+
+ agent1.sources.netcatSource.type = exec
+ agent1.sources.netcatSource.channels = ch1
+ agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/resources/flume-conf/flume-env.sample.sh
----------------------------------------------------------------------
diff --git a/flume/src/main/resources/flume-conf/flume-env.sample.sh b/flume/src/main/resources/flume-conf/flume-env.sample.sh
new file mode 100644
index 0000000..aca341c
--- /dev/null
+++ b/flume/src/main/resources/flume-conf/flume-env.sample.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# This script runs on the machine which have maven repository populated under
+# $HOME/.m2 If that's not the case, please adjust the JARPATH variable below
+# to point to colon separated list of directories where jar files can be found
+if test -z "$DT_FLUME_JAR"
+then
+  echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2
+  exit 2
+fi
+
+echo JARPATH is set to ${JARPATH:=$HOME/.m2/repository:.}
+if test -z "$JAVA_HOME"
+then
+  JAVA=java
+else
+  JAVA=${JAVA_HOME}/bin/java
+fi
+FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet`

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
new file mode 100644
index 0000000..4acf764
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.discovery;
+
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+
+import com.datatorrent.flume.discovery.Discovery.Service;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+@Ignore
+public class ZKAssistedDiscoveryTest
+{
+  public ZKAssistedDiscoveryTest()
+  {
+  }
+
+  @Test
+  public void testSerialization() throws Exception
+  {
+    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+    discovery.setServiceName("DTFlumeTest");
+    discovery.setConnectionString("localhost:2181");
+    discovery.setBasePath("/HelloDT");
+    discovery.setup(null);
+    ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>()
+    {
+      @Override
+      public String getHost()
+      {
+        return "localhost";
+      }
+
+      @Override
+      public int getPort()
+      {
+        return 8080;
+      }
+
+      @Override
+      public byte[] getPayload()
+      {
+        return null;
+      }
+
+      @Override
+      public String getId()
+      {
+        return "localhost8080";
+      }
+
+    });
+    InstanceSerializer<byte[]> instanceSerializer =
+        discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>()
+        {
+        });
+    byte[] serialize = instanceSerializer.serialize(instance);
+    logger.debug("serialized json = {}", new String(serialize));
+    ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize);
+    assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload());
+  }
+
+  @Test
+  public void testDiscover()
+  {
+    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+    discovery.setServiceName("DTFlumeTest");
+    discovery.setConnectionString("localhost:2181");
+    discovery.setBasePath("/HelloDT");
+    discovery.setup(null);
+    assertNotNull("Discovered Sinks", discovery.discover());
+    discovery.teardown();
+  }
+
+  @Test
+  public void testAdvertize()
+  {
+    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+    discovery.setServiceName("DTFlumeTest");
+    discovery.setConnectionString("localhost:2181");
+    discovery.setBasePath("/HelloDT");
+    discovery.setup(null);
+
+    Service<byte[]> service = new Service<byte[]>()
+    {
+      @Override
+      public String getHost()
+      {
+        return "chetan";
+      }
+
+      @Override
+      public int getPort()
+      {
+        return 5033;
+      }
+
+      @Override
+      public byte[] getPayload()
+      {
+        return new byte[] {3, 2, 1};
+      }
+
+      @Override
+      public String getId()
+      {
+        return "uniqueId";
+      }
+
+    };
+    discovery.advertise(service);
+    discovery.teardown();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
new file mode 100644
index 0000000..41364c8
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.integration;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.flume.operator.AbstractFlumeInputOperator;
+import com.datatorrent.flume.storage.EventCodec;
+
+/**
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+@Ignore
+public class ApplicationTest implements StreamingApplication
+{
+  public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event>
+  {
+    @Override
+    public Event convert(Event event)
+    {
+      return event;
+    }
+  }
+
+  public static class Counter implements Operator
+  {
+    private int count;
+    private transient Event event;
+    public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>()
+    {
+      @Override
+      public void process(Event tuple)
+      {
+        count++;
+        event = tuple;
+      }
+
+    };
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+      logger.debug("total count = {}, tuple = {}", count, event);
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(Counter.class);
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000);
+    FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator());
+    flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"});
+    flume.setCodec(new EventCodec());
+    Counter counter = dag.addOperator("Counter", new Counter());
+
+    dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  @Test
+  public void test()
+  {
+    try {
+      LocalMode.runApp(this, Integer.MAX_VALUE);
+    } catch (Exception ex) {
+      logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex);
+    }
+
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
new file mode 100644
index 0000000..464df42
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.flume.Context;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+public class ColumnFilteringInterceptorTest
+{
+  private static InterceptorTestHelper helper;
+
+  @BeforeClass
+  public static void startUp()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3");
+
+    helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap);
+  }
+
+  @Test
+  public void testInterceptEvent()
+  {
+    helper.testIntercept_Event();
+  }
+
+  @Test
+  public void testFiles() throws IOException, URISyntaxException
+  {
+    helper.testFiles();
+  }
+
+  @Test
+  public void testInterceptEventWithColumnZero()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0");
+
+    ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Empty Bytes",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
+
+    assertArrayEquals("One Field",
+        "First\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
new file mode 100644
index 0000000..739184f
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
@@ -0,0 +1,214 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.datatorrent.netlet.util.Slice;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+public class InterceptorTestHelper
+{
+  private static final byte FIELD_SEPARATOR = 1;
+
+  static class MyEvent implements Event
+  {
+    byte[] body;
+
+    MyEvent(byte[] bytes)
+    {
+      body = bytes;
+    }
+
+    @Override
+    public Map<String, String> getHeaders()
+    {
+      return null;
+    }
+
+    @Override
+    public void setHeaders(Map<String, String> map)
+    {
+    }
+
+    @Override
+    @SuppressWarnings("ReturnOfCollectionOrArrayField")
+    public byte[] getBody()
+    {
+      return body;
+    }
+
+    @Override
+    @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
+    public void setBody(byte[] bytes)
+    {
+      body = bytes;
+    }
+  }
+
+  private final Interceptor.Builder builder;
+  private final Map<String, String> context;
+
+  InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context)
+  {
+    this.builder = builder;
+    this.context = context;
+  }
+
+  public void testIntercept_Event()
+  {
+    builder.configure(new Context(context));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Empty Bytes",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("".getBytes())).getBody());
+
+    assertArrayEquals("One Separator",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("\002".getBytes())).getBody());
+
+    assertArrayEquals("Two Separators",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody());
+
+    assertArrayEquals("One Field",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "First\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("\002First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\001".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "Second\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody());
+
+    assertArrayEquals("Three Fields",
+        "Second\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody());
+
+    assertArrayEquals("Three Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody());
+
+    assertArrayEquals("Four Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody());
+
+    assertArrayEquals("Five Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody());
+
+    assertArrayEquals("Six Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
+  }
+
+  public void testFiles() throws IOException, URISyntaxException
+  {
+    Properties properties = new Properties();
+    properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties"));
+
+    String interceptor = null;
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      logger.debug("{} => {}", entry.getKey(), entry.getValue());
+
+      if (builder.getClass().getName().equals(entry.getValue().toString())) {
+        String key = entry.getKey().toString();
+        if (key.endsWith(".type")) {
+          interceptor = key.substring(0, key.length() - "type".length());
+          break;
+        }
+      }
+    }
+
+    assertNotNull(builder.getClass().getName(), interceptor);
+    @SuppressWarnings({"null", "ConstantConditions"})
+    final int interceptorLength = interceptor.length();
+
+    HashMap<String, String> map = new HashMap<String, String>();
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String key = entry.getKey().toString();
+      if (key.startsWith(interceptor)) {
+        map.put(key.substring(interceptorLength), entry.getValue().toString());
+      }
+    }
+
+    builder.configure(new Context(map));
+    Interceptor interceptorInstance = builder.build();
+
+    URL url = getClass().getResource("/test_data/gentxns/");
+    assertNotNull("Generated Transactions", url);
+
+    int records = 0;
+    File dir = new File(url.toURI());
+    for (File file : dir.listFiles()) {
+      records += processFile(file, interceptorInstance);
+    }
+
+    Assert.assertEquals("Total Records", 2200, records);
+  }
+
+  private int processFile(File file, Interceptor interceptor) throws IOException
+  {
+    InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName());
+    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
+
+    String line;
+    int i = 0;
+    while ((line = br.readLine()) != null) {
+      byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody();
+      RawEvent event = RawEvent.from(body, FIELD_SEPARATOR);
+      Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid);
+      logger.debug("guid = {}, time = {}", event.guid, event.time);
+      i++;
+    }
+
+    br.close();
+    return i;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
new file mode 100644
index 0000000..049609b
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
@@ -0,0 +1,119 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.Serializable;
+
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+public class RawEvent implements Serializable
+{
+  public Slice guid;
+  public long time;
+  public int dimensionsOffset;
+
+  public Slice getGUID()
+  {
+    return guid;
+  }
+
+  public long getTime()
+  {
+    return time;
+  }
+
+  RawEvent()
+  {
+    /* needed for Kryo serialization */
+  }
+
+  public static RawEvent from(byte[] row, byte separator)
+  {
+    final int rowsize = row.length;
+
+    /*
+     * Lets get the guid out of the current record
+     */
+    int sliceLengh = -1;
+    while (++sliceLengh < rowsize) {
+      if (row[sliceLengh] == separator) {
+        break;
+      }
+    }
+
+    int i = sliceLengh + 1;
+
+    /* lets parse the date */
+    int dateStart = i;
+    while (i < rowsize) {
+      if (row[i++] == separator) {
+        long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1));
+        RawEvent event = new RawEvent();
+        event.guid = new Slice(row, 0, sliceLengh);
+        event.time = time;
+        event.dimensionsOffset = i;
+        return event;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 5;
+    hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0);
+    hash = 61 * hash + (int)(this.time ^ (this.time >>> 32));
+    return hash;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "RawEvent{" + "guid=" + guid + ", time=" + time + '}';
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final RawEvent other = (RawEvent)obj;
+    if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) {
+      return false;
+    }
+    return this.time == other.time;
+  }
+
+  private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+  private static final Logger logger = LoggerFactory.getLogger(RawEvent.class);
+  private static final long serialVersionUID = 201312191312L;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
new file mode 100644
index 0000000..a615496
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.operator;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+public class AbstractFlumeInputOperatorTest
+{
+  public AbstractFlumeInputOperatorTest()
+  {
+  }
+
+  @Test
+  public void testThreadLocal()
+  {
+    ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>()
+    {
+      @Override
+      protected Set<Integer> initialValue()
+      {
+        return new HashSet<Integer>();
+      }
+
+    };
+    Set<Integer> get1 = tl.get();
+    get1.add(1);
+    assertTrue("Just Added Value", get1.contains(1));
+
+    Set<Integer> get2 = tl.get();
+    assertTrue("Previously added value", get2.contains(1));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
new file mode 100644
index 0000000..833a353
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
@@ -0,0 +1,143 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.channel.MemoryChannel;
+
+import com.datatorrent.flume.discovery.Discovery;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+public class DTFlumeSinkTest
+{
+  static final String hostname = "localhost";
+  int port = 0;
+
+  @Test
+  @SuppressWarnings("SleepWhileInLoop")
+  public void testServer() throws InterruptedException, IOException
+  {
+    Discovery<byte[]> discovery = new Discovery<byte[]>()
+    {
+      @Override
+      public synchronized void unadvertise(Service<byte[]> service)
+      {
+        notify();
+      }
+
+      @Override
+      public synchronized void advertise(Service<byte[]> service)
+      {
+        port = service.getPort();
+        logger.debug("listening at {}", service);
+        notify();
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public synchronized Collection<Service<byte[]>> discover()
+      {
+        try {
+          wait();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        }
+        return Collections.EMPTY_LIST;
+      }
+
+    };
+    DTFlumeSink sink = new DTFlumeSink();
+    sink.setName("TeskSink");
+    sink.setHostname(hostname);
+    sink.setPort(0);
+    sink.setAcceptedTolerance(2000);
+    sink.setChannel(new MemoryChannel());
+    sink.setDiscovery(discovery);
+    sink.start();
+    AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
+    {
+      private byte[] array;
+      private int offset = 2;
+
+      @Override
+      public void onMessage(byte[] buffer, int offset, int size)
+      {
+        Slice received = new Slice(buffer, offset, size);
+        logger.debug("Client Received = {}", received);
+        Assert.assertEquals(received,
+            new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE));
+        synchronized (DTFlumeSinkTest.this) {
+          DTFlumeSinkTest.this.notify();
+        }
+      }
+
+      @Override
+      public void connected()
+      {
+        super.connected();
+        array = new byte[Server.Request.FIXED_SIZE + offset];
+        array[offset] = Server.Command.ECHO.getOrdinal();
+        array[offset + 1] = 1;
+        array[offset + 2] = 2;
+        array[offset + 3] = 3;
+        array[offset + 4] = 4;
+        array[offset + 5] = 5;
+        array[offset + 6] = 6;
+        array[offset + 7] = 7;
+        array[offset + 8] = 8;
+        Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis());
+        write(array, offset, Server.Request.FIXED_SIZE);
+      }
+
+    };
+
+    DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
+    eventloop.start();
+    discovery.discover();
+    try {
+      eventloop.connect(new InetSocketAddress(hostname, port), client);
+      try {
+        synchronized (this) {
+          this.wait();
+        }
+      } finally {
+        eventloop.disconnect(client);
+      }
+    } finally {
+      eventloop.stop();
+    }
+
+    sink.stop();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
new file mode 100644
index 0000000..64495db
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ * @author Chetan Narsude <ch...@datatorrent.com>
+ */
+public class ServerTest
+{
+  byte[] array;
+
+  public ServerTest()
+  {
+    array = new byte[1024];
+  }
+
+  @Test
+  public void testInt()
+  {
+    Server.writeInt(array, 0, Integer.MAX_VALUE);
+    Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0));
+
+    Server.writeInt(array, 0, Integer.MIN_VALUE);
+    Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0));
+
+    Server.writeInt(array, 0, 0);
+    Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0));
+
+    Random rand = new Random();
+    for (int i = 0; i < 128; i++) {
+      int n = rand.nextInt();
+      if (rand.nextBoolean()) {
+        n = -n;
+      }
+      Server.writeInt(array, 0, n);
+      Assert.assertEquals("Random Integer", n, Server.readInt(array, 0));
+    }
+  }
+
+  @Test
+  public void testLong()
+  {
+    Server.writeLong(array, 0, Integer.MAX_VALUE);
+    Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, Integer.MIN_VALUE);
+    Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, 0);
+    Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, Long.MAX_VALUE);
+    Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, Long.MIN_VALUE);
+    Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, 0L);
+    Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0));
+
+    Random rand = new Random();
+    for (int i = 0; i < 128; i++) {
+      long n = rand.nextLong();
+      if (rand.nextBoolean()) {
+        n = -n;
+      }
+      Server.writeLong(array, 0, n);
+      Assert.assertEquals("Random Long", n, Server.readLong(array, 0));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/flume/conf/flume-conf.properties
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties b/flume/src/test/resources/flume/conf/flume-conf.properties
new file mode 100644
index 0000000..c892c53
--- /dev/null
+++ b/flume/src/test/resources/flume/conf/flume-conf.properties
@@ -0,0 +1,85 @@
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#agent1 on  node1
+agent1.channels = ch1
+agent1.sources = netcatSource
+agent1.sinks = dt
+
+# channels
+agent1.channels.ch1.type = file
+agent1.channels.ch1.capacity = 10000000
+agent1.channels.ch1.transactionCapacity = 10000
+agent1.channels.ch1.maxFileSize = 67108864
+
+agent1.sources.netcatSource.type = exec
+agent1.sources.netcatSource.channels = ch1
+agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1
+# Pick and Reorder the columns we need from a larger record for efficiency
+  agent1.sources.netcatSource.interceptors = columnchooser
+  agent1.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder
+  agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
+  agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1
+  agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 68 139 190 70 71 52 75 37 39 42 191 138
+
+ agent2.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder
+ agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
+ agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = {0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001
+
+# index  -- description -- type if different
+#  0 Slice guid; // long
+#  43 public long time // yyyy-MM-dd HH:mm:ss
+#  62 public long adv_id;
+#  69 public int cmp_type; // string
+#  68 public long cmp_id;
+#  139 public long line_id;
+#  190 public long bslice_id;
+#  70 public long ao_id;
+#  71 public long creative_id;
+#  52 public long algo_id;
+#  75 public int device_model_id; // string
+#  37 public long impressions;
+#  39 public long clicks;
+#  42 public double spend;
+#  191 public double bonus_spend;
+#  138 public double spend_local;
+#
+
+# first sink - dt
+agent1.sinks.dt.id = CEVL00P
+agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
+agent1.sinks.dt.hostname = localhost
+agent1.sinks.dt.port = 8080
+agent1.sinks.dt.sleepMillis = 7
+agent1.sinks.dt.throughputAdjustmentFactor = 2
+agent1.sinks.dt.maximumEventsPerTransaction = 5000
+agent1.sinks.dt.minimumEventsPerTransaction = 1
+
+# Ensure that we do not lose the data handed over to us by flume.
+    agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
+    agent1.sinks.dt.storage.restore = false
+    agent1.sinks.dt.storage.baseDir = /tmp/flume101
+    agent1.sinks.dt.channel = ch1
+
+# Ensure that we are able to detect flume sinks (and failures) automatically.
+   agent1.sinks.dt.discovery = com.datatorrent.flume.discovery.ZKAssistedDiscovery
+   agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181
+   agent1.sinks.dt.discovery.basePath = /HelloDT
+   agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000
+   agent1.sinks.dt.discovery.connectionRetryCount = 10
+   agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/flume/conf/flume-env.sh
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume-env.sh b/flume/src/test/resources/flume/conf/flume-env.sh
new file mode 100644
index 0000000..c2232ea
--- /dev/null
+++ b/flume/src/test/resources/flume/conf/flume-env.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# This script runs on the machine which have maven repository populated under
+# $HOME/.m2 If that's not the case, please adjust the JARPATH variable below
+# to point to colon separated list of directories where jar files can be found
+if test -z "$DT_FLUME_JAR"
+then
+  echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2
+  exit 2
+fi
+
+echo JARPATH is set to ${JARPATH:=$HOME/.m2/repository:.}
+if test -z "$JAVA_HOME"
+then
+  JAVA=java
+else
+  JAVA=${JAVA_HOME}/bin/java
+fi
+FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/log4j.properties b/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ac0a107
--- /dev/null
+++ b/flume/src/test/resources/log4j.properties
@@ -0,0 +1,38 @@
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.malhar=org.apache.log4j.RollingFileAppender
+log4j.appender.malhar.layout=org.apache.log4j.PatternLayout
+log4j.appender.malhar.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+#log4j.appender.malhar.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121500
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121500 b/flume/src/test/resources/test_data/gentxns/2013121500
new file mode 100644
index 0000000..3ce5646
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121500 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121501
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121501 b/flume/src/test/resources/test_data/gentxns/2013121501
new file mode 100644
index 0000000..b2e70c0
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121501 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121502
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121502 b/flume/src/test/resources/test_data/gentxns/2013121502
new file mode 100644
index 0000000..ec13862
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121502 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121503
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121503 b/flume/src/test/resources/test_data/gentxns/2013121503
new file mode 100644
index 0000000..8267dd3
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121503 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121504
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121504 b/flume/src/test/resources/test_data/gentxns/2013121504
new file mode 100644
index 0000000..addfe62
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121504 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121505
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121505 b/flume/src/test/resources/test_data/gentxns/2013121505
new file mode 100644
index 0000000..d76aa9f
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121505 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121506
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121506 b/flume/src/test/resources/test_data/gentxns/2013121506
new file mode 100644
index 0000000..2f5bbb6
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121506 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121507
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121507 b/flume/src/test/resources/test_data/gentxns/2013121507
new file mode 100644
index 0000000..a022dad
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121507 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121508
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121508 b/flume/src/test/resources/test_data/gentxns/2013121508
new file mode 100644
index 0000000..d1e7f5c
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121508 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121509
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121509 b/flume/src/test/resources/test_data/gentxns/2013121509
new file mode 100644
index 0000000..10d61de
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121509 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121510
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121510 b/flume/src/test/resources/test_data/gentxns/2013121510
new file mode 100644
index 0000000..c2f76c8
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121510 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121511
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121511 b/flume/src/test/resources/test_data/gentxns/2013121511
new file mode 100644
index 0000000..bf16cfe
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121511 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121512
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121512 b/flume/src/test/resources/test_data/gentxns/2013121512
new file mode 100644
index 0000000..fe75419
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121512 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121513
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121513 b/flume/src/test/resources/test_data/gentxns/2013121513
new file mode 100644
index 0000000..3094cae
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121513 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121514
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121514 b/flume/src/test/resources/test_data/gentxns/2013121514
new file mode 100644
index 0000000..6e00e4a
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121514 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121515
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121515 b/flume/src/test/resources/test_data/gentxns/2013121515
new file mode 100644
index 0000000..b860e43
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121515 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121516
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121516 b/flume/src/test/resources/test_data/gentxns/2013121516
new file mode 100644
index 0000000..dfb5854
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121516 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121517
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121517 b/flume/src/test/resources/test_data/gentxns/2013121517
new file mode 100644
index 0000000..c8da2cc
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121517 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121518
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121518 b/flume/src/test/resources/test_data/gentxns/2013121518
new file mode 100644
index 0000000..2cb628b
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121518 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121519
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121519 b/flume/src/test/resources/test_data/gentxns/2013121519
new file mode 100644
index 0000000..6fab9d9
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121519 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121520
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121520 b/flume/src/test/resources/test_data/gentxns/2013121520
new file mode 100644
index 0000000..ba56d49
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121520 differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121521
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/test_data/gentxns/2013121521 b/flume/src/test/resources/test_data/gentxns/2013121521
new file mode 100644
index 0000000..37de926
Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121521 differ