You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:02 UTC

[04/13] apex-malhar git commit: Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java
new file mode 100644
index 0000000..b8d2725
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>ErrorMaskingEventCodec class.</p>
+ *
+ * @since 1.0.4
+ */
+public class ErrorMaskingEventCodec extends EventCodec
+{
+
+  @Override
+  public Object fromByteArray(Slice fragment)
+  {
+    try {
+      return super.fromByteArray(fragment);
+    } catch (RuntimeException re) {
+      logger.warn("Cannot deserialize event {}", fragment, re);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Slice toByteArray(Event event)
+  {
+    try {
+      return super.toByteArray(event);
+    } catch (RuntimeException re) {
+      logger.warn("Cannot serialize event {}", event, re);
+    }
+
+    return null;
+  }
+
+
+  private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java
new file mode 100644
index 0000000..463551e
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>EventCodec class.</p>
+ *
+ * @since 0.9.4
+ */
+public class EventCodec implements StreamCodec<Event>
+{
+  private final transient Kryo kryo;
+
+  public EventCodec()
+  {
+    this.kryo = new Kryo();
+    this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+  }
+
+  @Override
+  public Object fromByteArray(Slice fragment)
+  {
+    ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
+    Input input = new Input(is);
+
+    @SuppressWarnings("unchecked")
+    HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class);
+    byte[] body = kryo.readObjectOrNull(input, byte[].class);
+    return EventBuilder.withBody(body, headers);
+  }
+
+  @Override
+  public Slice toByteArray(Event event)
+  {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Output output = new Output(os);
+
+    Map<String, String> headers = event.getHeaders();
+    if (headers != null && headers.getClass() != HashMap.class) {
+      HashMap<String, String> tmp = new HashMap<String, String>(headers.size());
+      tmp.putAll(headers);
+      headers = tmp;
+    }
+    kryo.writeObjectOrNull(output, headers, HashMap.class);
+    kryo.writeObjectOrNull(output, event.getBody(), byte[].class);
+    output.flush();
+    final byte[] bytes = os.toByteArray();
+    return new Slice(bytes, 0, bytes.length);
+  }
+
+  @Override
+  public int getPartition(Event o)
+  {
+    return o.hashCode();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(EventCodec.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
new file mode 100644
index 0000000..77aeb68
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
@@ -0,0 +1,947 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.common.util.NameableThreadFactory;
+import org.apache.apex.malhar.flume.sink.Server;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * HDFSStorage is developed to store and retrieve the data from HDFS
+ * <p />
+ * The properties that can be set on HDFSStorage are: <br />
+ * baseDir - The base directory where the data is going to be stored <br />
+ * restore - This is used to restore the application from previous failure <br />
+ * blockSize - The maximum size of the each file to created. <br />
+ *
+ * @since 0.9.3
+ */
+public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context>
+{
+  public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+  public static final String BASE_DIR_KEY = "baseDir";
+  public static final String RESTORE_KEY = "restore";
+  public static final String BLOCKSIZE = "blockSize";
+  public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple";
+  public static final String NUMBER_RETRY = "retryCount";
+
+  private static final String OFFSET_SUFFIX = "-offsetFile";
+  private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile";
+  private static final String FLUSHED_IDENTITY_FILE = "flushedCounter";
+  private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile";
+  private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp";
+  private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp";
+  private static final int IDENTIFIER_SIZE = 8;
+  private static final int DATA_LENGTH_BYTE_SIZE = 4;
+
+  /**
+   * Number of times the storage will try to get the filesystem
+   */
+  private int retryCount = 3;
+  /**
+   * The multiple of block size
+   */
+  private int blockSizeMultiple = 1;
+  /**
+   * Identifier for this storage.
+   */
+  @NotNull
+  private String id;
+  /**
+   * The baseDir where the storage facility is going to create files.
+   */
+  @NotNull
+  private String baseDir;
+  /**
+   * The block size to be used to create the storage files
+   */
+  private long blockSize;
+  /**
+   *
+   */
+  private boolean restore;
+  /**
+   * This identifies the current file number
+   */
+  private long currentWrittenFile;
+  /**
+   * This identifies the file number that has been flushed
+   */
+  private long flushedFileCounter;
+  /**
+   * The file that stores the fileCounter information
+   */
+  // private Path fileCounterFile;
+  /**
+   * The file that stores the flushed fileCounter information
+   */
+  private Path flushedCounterFile;
+  private Path flushedCounterFileTemp;
+  /**
+   * This identifies the last cleaned file number
+   */
+  private long cleanedFileCounter;
+  /**
+   * The file that stores the clean file counter information
+   */
+  // private Path cleanFileCounterFile;
+  /**
+   * The file that stores the clean file offset information
+   */
+  private Path cleanFileOffsetFile;
+  private Path cleanFileOffsetFileTemp;
+  private FileSystem fs;
+  private FSDataOutputStream dataStream;
+  ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>();
+  /**
+   * The offset in the current opened file
+   */
+  private long fileWriteOffset;
+  private FSDataInputStream readStream;
+  private long retrievalOffset;
+  private long retrievalFile;
+  private int offset;
+  private long flushedLong;
+  private long flushedFileWriteOffset;
+  private long bookKeepingFileOffset;
+  private byte[] cleanedOffset = new byte[8];
+  private long skipOffset;
+  private long skipFile;
+  private transient Path basePath;
+  private ExecutorService storageExecutor;
+  private byte[] currentData;
+  private FSDataInputStream nextReadStream;
+  private long nextFlushedLong;
+  private long nextRetrievalFile;
+  private byte[] nextRetrievalData;
+
+  public HDFSStorage()
+  {
+    this.restore = true;
+  }
+
+  /**
+   * This stores the Identifier information identified in the last store function call
+   *
+   * @param ctx
+   */
+  @Override
+  public void configure(Context ctx)
+  {
+    String tempId = ctx.getString(ID);
+    if (tempId == null) {
+      if (id == null) {
+        throw new IllegalArgumentException("id can't be  null.");
+      }
+    } else {
+      id = tempId;
+    }
+
+    String tempBaseDir = ctx.getString(BASE_DIR_KEY);
+    if (tempBaseDir != null) {
+      baseDir = tempBaseDir;
+    }
+
+    restore = ctx.getBoolean(RESTORE_KEY, restore);
+    Long tempBlockSize = ctx.getLong(BLOCKSIZE);
+    if (tempBlockSize != null) {
+      blockSize = tempBlockSize;
+    }
+    blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple);
+    retryCount = ctx.getInteger(NUMBER_RETRY,retryCount);
+  }
+
+  /**
+   * This function reads the file at a location and return the bytes stored in the file "
+   *
+   * @param path - the location of the file
+   * @return
+   * @throws IOException
+   */
+  byte[] readData(Path path) throws IOException
+  {
+    DataInputStream is = new DataInputStream(fs.open(path));
+    byte[] bytes = new byte[is.available()];
+    is.readFully(bytes);
+    is.close();
+    return bytes;
+  }
+
+  /**
+   * This function writes the bytes to a file specified by the path
+   *
+   * @param path the file location
+   * @param data the data to be written to the file
+   * @return
+   * @throws IOException
+   */
+  private FSDataOutputStream writeData(Path path, byte[] data) throws IOException
+  {
+    FSDataOutputStream fsOutputStream;
+    if (fs.getScheme().equals("file")) {
+      // local FS does not support hflush and does not flush native stream
+      fsOutputStream = new FSDataOutputStream(
+          new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null);
+    } else {
+      fsOutputStream = fs.create(path);
+    }
+    fsOutputStream.write(data);
+    return fsOutputStream;
+  }
+
+  private long calculateOffset(long fileOffset, long fileCounter)
+  {
+    return ((fileCounter << 32) | (fileOffset & 0xffffffffL));
+  }
+
+  @Override
+  public byte[] store(Slice slice)
+  {
+    // logger.debug("store message ");
+    int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE;
+    if (currentWrittenFile < skipFile) {
+      fileWriteOffset += bytesToWrite;
+      if (fileWriteOffset >= bookKeepingFileOffset) {
+        files2Commit.add(new DataBlock(null, bookKeepingFileOffset,
+            new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile));
+        currentWrittenFile++;
+        if (fileWriteOffset > bookKeepingFileOffset) {
+          fileWriteOffset = bytesToWrite;
+        } else {
+          fileWriteOffset = 0;
+        }
+        try {
+          bookKeepingFileOffset = getFlushedFileWriteOffset(
+              new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return null;
+    }
+
+    if (flushedFileCounter == currentWrittenFile && dataStream == null) {
+      currentWrittenFile++;
+      fileWriteOffset = 0;
+    }
+
+    if (flushedFileCounter == skipFile && skipFile != -1) {
+      skipFile++;
+    }
+
+    if (fileWriteOffset + bytesToWrite < blockSize) {
+      try {
+        /* write length and the actual data to the file */
+        if (fileWriteOffset == 0) {
+          // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close();
+          dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)),
+              Ints.toByteArray(slice.length));
+          dataStream.write(slice.buffer, slice.offset, slice.length);
+        } else {
+          dataStream.write(Ints.toByteArray(slice.length));
+          dataStream.write(slice.buffer, slice.offset, slice.length);
+        }
+        fileWriteOffset += bytesToWrite;
+
+        byte[] fileOffset = null;
+        if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) {
+          skipFile = -1;
+          fileOffset = new byte[IDENTIFIER_SIZE];
+          Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile));
+        }
+        return fileOffset;
+      } catch (IOException ex) {
+        logger.warn("Error while storing the bytes {}", ex.getMessage());
+        closeFs();
+        throw new RuntimeException(ex);
+      }
+    }
+    DataBlock db = new DataBlock(dataStream, fileWriteOffset,
+        new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile);
+    db.close();
+    files2Commit.add(db);
+    fileWriteOffset = 0;
+    ++currentWrittenFile;
+    return store(slice);
+  }
+
+  /**
+   * @param b
+   * @param startIndex
+   * @return
+   */
+  long byteArrayToLong(byte[] b, int startIndex)
+  {
+    final byte b1 = 0;
+    return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]);
+  }
+
+  @Override
+  public byte[] retrieve(byte[] identifier)
+  {
+    skipFile = -1;
+    skipOffset = 0;
+    logger.debug("retrieve with address {}", Arrays.toString(identifier));
+    // flushing the last incomplete flushed file
+    closeUnflushedFiles();
+
+    retrievalOffset = byteArrayToLong(identifier, 0);
+    retrievalFile = byteArrayToLong(identifier, offset);
+
+    if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) {
+      skipOffset = 0;
+      return null;
+    }
+
+    // making sure that the deleted address is not requested again
+    if (retrievalFile != 0 || retrievalOffset != 0) {
+      long cleanedFile = byteArrayToLong(cleanedOffset, offset);
+      if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile &&
+          retrievalOffset < byteArrayToLong(cleanedOffset, 0))) {
+        logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " +
+            "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0));
+        closeFs();
+        throw new IllegalArgumentException(String.format("The data for address %s has already been deleted",
+            Arrays.toString(identifier)));
+      }
+    }
+
+    // we have just started
+    if (retrievalFile == 0 && retrievalOffset == 0) {
+      retrievalFile = byteArrayToLong(cleanedOffset, offset);
+      retrievalOffset = byteArrayToLong(cleanedOffset, 0);
+    }
+
+    if ((retrievalFile > flushedFileCounter)) {
+      skipFile = retrievalFile;
+      skipOffset = retrievalOffset;
+      retrievalFile = -1;
+      return null;
+    }
+    if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) {
+      skipFile = retrievalFile;
+      skipOffset = retrievalOffset - flushedFileWriteOffset;
+      retrievalFile = -1;
+      return null;
+    }
+
+    try {
+      if (readStream != null) {
+        readStream.close();
+        readStream = null;
+      }
+      Path path = new Path(basePath, String.valueOf(retrievalFile));
+      if (!fs.exists(path)) {
+        retrievalFile = -1;
+        closeFs();
+        throw new RuntimeException(String.format("File %s does not exist", path.toString()));
+      }
+
+      byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+      flushedLong = Server.readLong(flushedOffset, 0);
+      while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) {
+        retrievalOffset -= flushedLong;
+        retrievalFile++;
+        flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+        flushedLong = Server.readLong(flushedOffset, 0);
+      }
+
+      if (retrievalOffset >= flushedLong) {
+        logger.warn("data not flushed for the given identifier");
+        retrievalFile = -1;
+        return null;
+      }
+      synchronized (HDFSStorage.this) {
+        if (nextReadStream != null) {
+          nextReadStream.close();
+          nextReadStream = null;
+        }
+      }
+      currentData = null;
+      path = new Path(basePath, String.valueOf(retrievalFile));
+      //readStream = new FSDataInputStream(fs.open(path));
+      currentData = readData(path);
+      //readStream.seek(retrievalOffset);
+      storageExecutor.submit(getNextStream());
+      return retrieveHelper();
+    } catch (IOException e) {
+      closeFs();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private byte[] retrieveHelper() throws IOException
+  {
+    int tempRetrievalOffset = (int)retrievalOffset;
+    int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1],
+        currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]);
+    byte[] data = new byte[length + IDENTIFIER_SIZE];
+    System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length);
+    retrievalOffset += length + DATA_LENGTH_BYTE_SIZE;
+    if (retrievalOffset >= flushedLong) {
+      Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1));
+    } else {
+      Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile));
+    }
+    return data;
+  }
+
+  @Override
+  public byte[] retrieveNext()
+  {
+    if (retrievalFile == -1) {
+      closeFs();
+      throw new RuntimeException("Call retrieve first");
+    }
+
+    if (retrievalFile > flushedFileCounter) {
+      logger.warn("data is not flushed");
+      return null;
+    }
+
+    try {
+      if (currentData == null) {
+        synchronized (HDFSStorage.this) {
+          if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
+            currentData = nextRetrievalData;
+            flushedLong = nextFlushedLong;
+            nextRetrievalData = null;
+          } else {
+            currentData = null;
+            currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
+            byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+            flushedLong = Server.readLong(flushedOffset, 0);
+          }
+        }
+        storageExecutor.submit(getNextStream());
+      }
+
+      if (retrievalOffset >= flushedLong) {
+        retrievalFile++;
+        retrievalOffset = 0;
+
+        if (retrievalFile > flushedFileCounter) {
+          logger.warn("data is not flushed");
+          return null;
+        }
+
+        //readStream.close();
+        // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile))));
+        // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+        // flushedLong = Server.readLong(flushedOffset, 0);
+
+        synchronized (HDFSStorage.this) {
+          if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
+            currentData = nextRetrievalData;
+            flushedLong = nextFlushedLong;
+            nextRetrievalData = null;
+          } else {
+            currentData = null;
+            currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
+            byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+            flushedLong = Server.readLong(flushedOffset, 0);
+          }
+        }
+        storageExecutor.submit(getNextStream());
+      }
+      //readStream.seek(retrievalOffset);
+      return retrieveHelper();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
+  public void clean(byte[] identifier)
+  {
+    logger.info("clean {}", Arrays.toString(identifier));
+    long cleanFileIndex = byteArrayToLong(identifier, offset);
+
+    long cleanFileOffset = byteArrayToLong(identifier, 0);
+    if (flushedFileCounter == -1) {
+      identifier = new byte[8];
+    } else if (cleanFileIndex > flushedFileCounter ||
+        (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) {
+      // This is to make sure that we clean only the data that is flushed
+      cleanFileIndex = flushedFileCounter;
+      cleanFileOffset = flushedFileWriteOffset;
+      Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex));
+    }
+    cleanedOffset = identifier;
+
+    try {
+      writeData(cleanFileOffsetFileTemp, identifier).close();
+      fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile);
+      if (cleanedFileCounter >= cleanFileIndex) {
+        return;
+      }
+      do {
+        Path path = new Path(basePath, String.valueOf(cleanedFileCounter));
+        if (fs.exists(path) && fs.isFile(path)) {
+          fs.delete(path, false);
+        }
+        path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX);
+        if (fs.exists(path) && fs.isFile(path)) {
+          fs.delete(path, false);
+        }
+        path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET);
+        if (fs.exists(path) && fs.isFile(path)) {
+          fs.delete(path, false);
+        }
+        logger.info("deleted file {}", cleanedFileCounter);
+        ++cleanedFileCounter;
+      } while (cleanedFileCounter < cleanFileIndex);
+      // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close();
+
+    } catch (IOException e) {
+      logger.warn("not able to close the streams {}", e.getMessage());
+      closeFs();
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This is used mainly for cleaning up of counter files created
+   */
+  void cleanHelperFiles()
+  {
+    try {
+      fs.delete(basePath, true);
+    } catch (IOException e) {
+      logger.warn(e.getMessage());
+    }
+  }
+
+  private void closeUnflushedFiles()
+  {
+    try {
+      files2Commit.clear();
+      // closing the stream
+      if (dataStream != null) {
+        dataStream.close();
+        dataStream = null;
+        // currentWrittenFile++;
+        // fileWriteOffset = 0;
+      }
+
+      if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) {
+        fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false);
+      }
+
+      if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) {
+        // This means that flush was called
+        flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
+        bookKeepingFileOffset = getFlushedFileWriteOffset(
+            new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
+      }
+
+      if (flushedFileCounter != -1) {
+        currentWrittenFile = flushedFileCounter;
+        fileWriteOffset = flushedFileWriteOffset;
+      } else {
+        currentWrittenFile = 0;
+        fileWriteOffset = 0;
+      }
+
+      flushedLong = 0;
+
+    } catch (IOException e) {
+      closeFs();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void flush()
+  {
+    nextReadStream = null;
+    StringBuilder builder = new StringBuilder();
+    Iterator<DataBlock> itr = files2Commit.iterator();
+    DataBlock db;
+    try {
+      while (itr.hasNext()) {
+        db = itr.next();
+        db.updateOffsets();
+        builder.append(db.fileName).append(", ");
+      }
+      files2Commit.clear();
+
+      if (dataStream != null) {
+        dataStream.hflush();
+        writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close();
+        fs.rename(flushedCounterFileTemp, flushedCounterFile);
+        updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset);
+        flushedFileWriteOffset = fileWriteOffset;
+        builder.append(currentWrittenFile);
+      }
+      logger.debug("flushed files {}", builder.toString());
+    } catch (IOException ex) {
+      logger.warn("not able to close the stream {}", ex.getMessage());
+      closeFs();
+      throw new RuntimeException(ex);
+    }
+    flushedFileCounter = currentWrittenFile;
+    // logger.debug("flushedFileCounter in flush {}",flushedFileCounter);
+  }
+
+  /**
+   * This updates the flushed offset
+   */
+  private void updateFlushedOffset(Path file, long bytesWritten)
+  {
+    byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE];
+    Server.writeLong(lastStoredOffset, 0, bytesWritten);
+    try {
+      writeData(file, lastStoredOffset).close();
+    } catch (IOException e) {
+      try {
+        if (!Arrays.equals(readData(file), lastStoredOffset)) {
+          closeFs();
+          throw new RuntimeException(e);
+        }
+      } catch (Exception e1) {
+        closeFs();
+        throw new RuntimeException(e1);
+      }
+    }
+  }
+
+  public int getBlockSizeMultiple()
+  {
+    return blockSizeMultiple;
+  }
+
+  public void setBlockSizeMultiple(int blockSizeMultiple)
+  {
+    this.blockSizeMultiple = blockSizeMultiple;
+  }
+
+  /**
+   * @return the baseDir
+   */
+  public String getBaseDir()
+  {
+    return baseDir;
+  }
+
+  /**
+   * @param baseDir the baseDir to set
+   */
+  public void setBaseDir(String baseDir)
+  {
+    this.baseDir = baseDir;
+  }
+
+  /**
+   * @return the id
+   */
+  public String getId()
+  {
+    return id;
+  }
+
+  /**
+   * @param id the id to set
+   */
+  public void setId(String id)
+  {
+    this.id = id;
+  }
+
+  /**
+   * @return the blockSize
+   */
+  public long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  /**
+   * @param blockSize the blockSize to set
+   */
+  public void setBlockSize(long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * @return the restore
+   */
+  public boolean isRestore()
+  {
+    return restore;
+  }
+
+  /**
+   * @param restore the restore to set
+   */
+  public void setRestore(boolean restore)
+  {
+    this.restore = restore;
+  }
+
+  class DataBlock
+  {
+    FSDataOutputStream dataStream;
+    long dataOffset;
+    Path path2FlushedData;
+    long fileName;
+    private Path bookKeepingPath;
+
+    DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName)
+    {
+      this.dataStream = stream;
+      this.dataOffset = bytesWritten;
+      this.path2FlushedData = path2FlushedData;
+      this.fileName = fileName;
+    }
+
+    public void close()
+    {
+      if (dataStream != null) {
+        try {
+          dataStream.close();
+          bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET);
+          updateFlushedOffset(bookKeepingPath, dataOffset);
+        } catch (IOException ex) {
+          logger.warn("not able to close the stream {}", ex.getMessage());
+          closeFs();
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    public void updateOffsets() throws IOException
+    {
+      updateFlushedOffset(path2FlushedData, dataOffset);
+      if (bookKeepingPath != null && fs.exists(bookKeepingPath)) {
+        fs.delete(bookKeepingPath, false);
+      }
+    }
+
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class);
+
+  @Override
+  public void setup(com.datatorrent.api.Context context)
+  {
+    Configuration conf = new Configuration();
+    if (baseDir == null) {
+      baseDir = conf.get("hadoop.tmp.dir");
+      if (baseDir == null || baseDir.isEmpty()) {
+        throw new IllegalArgumentException("baseDir cannot be null.");
+      }
+    }
+    offset = 4;
+    skipOffset = -1;
+    skipFile = -1;
+    int tempRetryCount = 0;
+    while (tempRetryCount < retryCount && fs == null) {
+      try {
+        fs = FileSystem.newInstance(conf);
+        tempRetryCount++;
+      } catch (Throwable throwable) {
+        logger.warn("Not able to get file system ", throwable);
+      }
+    }
+
+    try {
+      Path path = new Path(baseDir);
+      basePath = new Path(path, id);
+      if (fs == null) {
+        fs = FileSystem.newInstance(conf);
+      }
+      if (!fs.exists(path)) {
+        closeFs();
+        throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir));
+      }
+      if (!fs.isDirectory(path)) {
+        closeFs();
+        throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir));
+      }
+      if (!restore) {
+        fs.delete(basePath, true);
+      }
+      if (!fs.exists(basePath) || !fs.isDirectory(basePath)) {
+        fs.mkdirs(basePath);
+      }
+
+      if (blockSize == 0) {
+        blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData"));
+      }
+      if (blockSize == 0) {
+        blockSize = DEFAULT_BLOCK_SIZE;
+      }
+
+      blockSize = blockSizeMultiple * blockSize;
+
+      currentWrittenFile = 0;
+      cleanedFileCounter = -1;
+      retrievalFile = -1;
+      // fileCounterFile = new Path(basePath, IDENTITY_FILE);
+      flushedFileCounter = -1;
+      // cleanFileCounterFile = new Path(basePath, CLEAN_FILE);
+      cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE);
+      cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP);
+      flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE);
+      flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP);
+
+      if (restore) {
+        //
+        // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) {
+        // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile)));
+        // }
+
+        if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) {
+          cleanedOffset = readData(cleanFileOffsetFile);
+        }
+
+        if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) {
+          String strFlushedFileCounter = new String(readData(flushedCounterFile));
+          if (strFlushedFileCounter.isEmpty()) {
+            logger.warn("empty flushed file");
+          } else {
+            flushedFileCounter = Long.valueOf(strFlushedFileCounter);
+            flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
+            bookKeepingFileOffset = getFlushedFileWriteOffset(
+                new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
+          }
+
+        }
+      }
+      fileWriteOffset = flushedFileWriteOffset;
+      currentWrittenFile = flushedFileCounter;
+      cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1;
+      if (currentWrittenFile == -1) {
+        ++currentWrittenFile;
+        fileWriteOffset = 0;
+      }
+
+    } catch (IOException io) {
+
+      throw new RuntimeException(io);
+    }
+    storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper"));
+  }
+
+  private void closeFs()
+  {
+    if (fs != null) {
+      try {
+        fs.close();
+        fs = null;
+      } catch (IOException e) {
+        logger.debug(e.getMessage());
+      }
+    }
+  }
+
+  private long getFlushedFileWriteOffset(Path filePath) throws IOException
+  {
+    if (flushedFileCounter != -1 && fs.exists(filePath)) {
+      byte[] flushedFileOffsetByte = readData(filePath);
+      if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) {
+        return Server.readLong(flushedFileOffsetByte, 0);
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public void teardown()
+  {
+    logger.debug("called teardown");
+    try {
+      if (readStream != null) {
+        readStream.close();
+      }
+      synchronized (HDFSStorage.this) {
+        if (nextReadStream != null) {
+          nextReadStream.close();
+        }
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      closeUnflushedFiles();
+      storageExecutor.shutdown();
+    }
+
+  }
+
+  private Runnable getNextStream()
+  {
+    return new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        try {
+          synchronized (HDFSStorage.this) {
+            nextRetrievalFile = retrievalFile + 1;
+            if (nextRetrievalFile > flushedFileCounter) {
+              nextRetrievalData = null;
+              return;
+            }
+            Path path = new Path(basePath, String.valueOf(nextRetrievalFile));
+            Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX);
+            nextRetrievalData = null;
+            nextRetrievalData = readData(path);
+            byte[] flushedOffset = readData(offsetPath);
+            nextFlushedLong = Server.readLong(flushedOffset, 0);
+          }
+        } catch (Throwable e) {
+          logger.warn("in storage executor ", e);
+
+        }
+      }
+    };
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java
new file mode 100644
index 0000000..add1831
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>Storage interface.</p>
+ *
+ * @since 0.9.2
+ */
+public interface Storage
+{
+  /**
+   * key in the context for Unique identifier for the storage which may be used to recover from failure.
+   */
+  String ID = "id";
+
+  /**
+   * This stores the bytes and returns the unique identifier to retrieve these bytes
+   *
+   * @param bytes
+   * @return
+   */
+  byte[] store(Slice bytes);
+
+  /**
+   * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/>
+   * The first eight bytes contain the identifier and the remaining bytes contain the data
+   *
+   * @param identifier
+   * @return
+   */
+  byte[] retrieve(byte[] identifier);
+
+  /**
+   * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based
+   * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/>
+   * The first eight bytes contain the identifier and the remaining bytes contain the data
+   *
+   * @return
+   */
+  byte[] retrieveNext();
+
+  /**
+   * This is used to clean up the files identified by identifier
+   *
+   * @param identifier
+   */
+  void clean(byte[] identifier);
+
+  /**
+   * This flushes the data from stream
+   *
+   */
+  void flush();
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
----------------------------------------------------------------------
diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
index 1782d4a..af59e52 100644
--- a/flume/src/main/resources/flume-conf/flume-conf.sample.properties
+++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
@@ -23,7 +23,7 @@
  agent1.sinks = dt
 
 # first sink - dt
- agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
+ agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
  agent1.sinks.dt.id = sink1
  agent1.sinks.dt.hostname = localhost
  agent1.sinks.dt.port = 8080
@@ -31,7 +31,7 @@
  agent1.sinks.dt.throughputAdjustmentFactor = 2
  agent1.sinks.dt.maximumEventsPerTransaction = 5000
  agent1.sinks.dt.minimumEventsPerTransaction = 1
- agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
+ agent1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage
  agent1.sinks.dt.storage.restore = false
  agent1.sinks.dt.storage.baseDir = /tmp/flume101
  agent1.sinks.dt.channel = ch1

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
deleted file mode 100644
index f182edc..0000000
--- a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.discovery;
-
-import org.codehaus.jackson.type.TypeReference;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-
-import com.datatorrent.flume.discovery.Discovery.Service;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- *
- */
-@Ignore
-public class ZKAssistedDiscoveryTest
-{
-  public ZKAssistedDiscoveryTest()
-  {
-  }
-
-  @Test
-  public void testSerialization() throws Exception
-  {
-    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
-    discovery.setServiceName("DTFlumeTest");
-    discovery.setConnectionString("localhost:2181");
-    discovery.setBasePath("/HelloDT");
-    discovery.setup(null);
-    ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>()
-    {
-      @Override
-      public String getHost()
-      {
-        return "localhost";
-      }
-
-      @Override
-      public int getPort()
-      {
-        return 8080;
-      }
-
-      @Override
-      public byte[] getPayload()
-      {
-        return null;
-      }
-
-      @Override
-      public String getId()
-      {
-        return "localhost8080";
-      }
-
-    });
-    InstanceSerializer<byte[]> instanceSerializer =
-        discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>()
-        {
-        });
-    byte[] serialize = instanceSerializer.serialize(instance);
-    logger.debug("serialized json = {}", new String(serialize));
-    ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize);
-    assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload());
-  }
-
-  @Test
-  public void testDiscover()
-  {
-    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
-    discovery.setServiceName("DTFlumeTest");
-    discovery.setConnectionString("localhost:2181");
-    discovery.setBasePath("/HelloDT");
-    discovery.setup(null);
-    assertNotNull("Discovered Sinks", discovery.discover());
-    discovery.teardown();
-  }
-
-  @Test
-  public void testAdvertize()
-  {
-    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
-    discovery.setServiceName("DTFlumeTest");
-    discovery.setConnectionString("localhost:2181");
-    discovery.setBasePath("/HelloDT");
-    discovery.setup(null);
-
-    Service<byte[]> service = new Service<byte[]>()
-    {
-      @Override
-      public String getHost()
-      {
-        return "chetan";
-      }
-
-      @Override
-      public int getPort()
-      {
-        return 5033;
-      }
-
-      @Override
-      public byte[] getPayload()
-      {
-        return new byte[] {3, 2, 1};
-      }
-
-      @Override
-      public String getId()
-      {
-        return "uniqueId";
-      }
-
-    };
-    discovery.advertise(service);
-    discovery.teardown();
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
deleted file mode 100644
index 8256916..0000000
--- a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.integration;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.flume.operator.AbstractFlumeInputOperator;
-import com.datatorrent.flume.storage.EventCodec;
-
-/**
- *
- */
-@Ignore
-public class ApplicationTest implements StreamingApplication
-{
-  public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event>
-  {
-    @Override
-    public Event convert(Event event)
-    {
-      return event;
-    }
-  }
-
-  public static class Counter implements Operator
-  {
-    private int count;
-    private transient Event event;
-    public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>()
-    {
-      @Override
-      public void process(Event tuple)
-      {
-        count++;
-        event = tuple;
-      }
-
-    };
-
-    @Override
-    public void beginWindow(long windowId)
-    {
-    }
-
-    @Override
-    public void endWindow()
-    {
-      logger.debug("total count = {}, tuple = {}", count, event);
-    }
-
-    @Override
-    public void setup(OperatorContext context)
-    {
-    }
-
-    @Override
-    public void teardown()
-    {
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(Counter.class);
-  }
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000);
-    FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator());
-    flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"});
-    flume.setCodec(new EventCodec());
-    Counter counter = dag.addOperator("Counter", new Counter());
-
-    dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL);
-  }
-
-  @Test
-  public void test()
-  {
-    try {
-      LocalMode.runApp(this, Integer.MAX_VALUE);
-    } catch (Exception ex) {
-      logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex);
-    }
-
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
deleted file mode 100644
index aca99c3..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.flume.Context;
-import org.apache.flume.interceptor.Interceptor;
-
-import static org.junit.Assert.assertArrayEquals;
-
-/**
- * Tests for {@link ColumnFilteringFormattingInterceptor}
- */
-public class ColumnFilteringFormattingInterceptorTest
-{
-  private static InterceptorTestHelper helper;
-
-  @BeforeClass
-  public static void startUp()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001");
-
-    helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap);
-  }
-
-  @Test
-  public void testInterceptEvent()
-  {
-    helper.testIntercept_Event();
-  }
-
-  @Test
-  public void testFiles() throws IOException, URISyntaxException
-  {
-    helper.testFiles();
-  }
-
-  @Test
-  public void testInterceptEventWithPrefix()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001");
-
-    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
-    builder.configure(new Context(contextMap));
-    Interceptor interceptor = builder.build();
-
-    assertArrayEquals("Six Fields",
-        "\001\001Second\001\001".getBytes(),
-        interceptor.intercept(
-        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
-  }
-
-  @Test
-  public void testInterceptEventWithLongSeparator()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi");
-
-    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
-    builder.configure(new Context(contextMap));
-    Interceptor interceptor = builder.build();
-    byte[] body = interceptor.intercept(
-        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
-
-    assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body);
-  }
-
-  @Test
-  public void testInterceptEventWithTerminatingSeparator()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}");
-
-    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
-    builder.configure(new Context(contextMap));
-    Interceptor interceptor = builder.build();
-    byte[] body = interceptor.intercept(
-        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
-
-    assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body);
-  }
-
-  @Test
-  public void testInterceptEventWithColumnZero()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001");
-
-    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
-    builder.configure(new Context(contextMap));
-    Interceptor interceptor = builder.build();
-
-    assertArrayEquals("Empty Bytes",
-        "\001".getBytes(),
-        interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
-
-    assertArrayEquals("One Field",
-        "First\001".getBytes(),
-        interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
-
-    assertArrayEquals("Two Fields",
-        "\001".getBytes(),
-        interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
deleted file mode 100644
index 11ee23f..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.flume.Context;
-import org.apache.flume.interceptor.Interceptor;
-
-import static org.junit.Assert.assertArrayEquals;
-
-/**
- *
- */
-public class ColumnFilteringInterceptorTest
-{
-  private static InterceptorTestHelper helper;
-
-  @BeforeClass
-  public static void startUp()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3");
-
-    helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap);
-  }
-
-  @Test
-  public void testInterceptEvent()
-  {
-    helper.testIntercept_Event();
-  }
-
-  @Test
-  public void testFiles() throws IOException, URISyntaxException
-  {
-    helper.testFiles();
-  }
-
-  @Test
-  public void testInterceptEventWithColumnZero()
-  {
-    HashMap<String, String> contextMap = new HashMap<String, String>();
-    contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
-    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
-    contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0");
-
-    ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder();
-    builder.configure(new Context(contextMap));
-    Interceptor interceptor = builder.build();
-
-    assertArrayEquals("Empty Bytes",
-        "\001".getBytes(),
-        interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
-
-    assertArrayEquals("One Field",
-        "First\001".getBytes(),
-        interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
-
-    assertArrayEquals("Two Fields",
-        "\001".getBytes(),
-        interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
deleted file mode 100644
index dc95f08..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.interceptor.Interceptor;
-
-import com.datatorrent.netlet.util.Slice;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- *
- */
-public class InterceptorTestHelper
-{
-  private static final byte FIELD_SEPARATOR = 1;
-
-  static class MyEvent implements Event
-  {
-    byte[] body;
-
-    MyEvent(byte[] bytes)
-    {
-      body = bytes;
-    }
-
-    @Override
-    public Map<String, String> getHeaders()
-    {
-      return null;
-    }
-
-    @Override
-    public void setHeaders(Map<String, String> map)
-    {
-    }
-
-    @Override
-    @SuppressWarnings("ReturnOfCollectionOrArrayField")
-    public byte[] getBody()
-    {
-      return body;
-    }
-
-    @Override
-    @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
-    public void setBody(byte[] bytes)
-    {
-      body = bytes;
-    }
-  }
-
-  private final Interceptor.Builder builder;
-  private final Map<String, String> context;
-
-  InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context)
-  {
-    this.builder = builder;
-    this.context = context;
-  }
-
-  public void testIntercept_Event()
-  {
-    builder.configure(new Context(context));
-    Interceptor interceptor = builder.build();
-
-    assertArrayEquals("Empty Bytes",
-        "\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("".getBytes())).getBody());
-
-    assertArrayEquals("One Separator",
-        "\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("\002".getBytes())).getBody());
-
-    assertArrayEquals("Two Separators",
-        "\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody());
-
-    assertArrayEquals("One Field",
-        "\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First".getBytes())).getBody());
-
-    assertArrayEquals("Two Fields",
-        "First\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("\002First".getBytes())).getBody());
-
-    assertArrayEquals("Two Fields",
-        "\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\001".getBytes())).getBody());
-
-    assertArrayEquals("Two Fields",
-        "Second\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody());
-
-    assertArrayEquals("Three Fields",
-        "Second\001\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody());
-
-    assertArrayEquals("Three Fields",
-        "\001Second\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody());
-
-    assertArrayEquals("Four Fields",
-        "\001Second\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody());
-
-    assertArrayEquals("Five Fields",
-        "\001Second\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody());
-
-    assertArrayEquals("Six Fields",
-        "\001Second\001\001".getBytes(),
-        interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
-  }
-
-  public void testFiles() throws IOException, URISyntaxException
-  {
-    Properties properties = new Properties();
-    properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties"));
-
-    String interceptor = null;
-    for (Entry<Object, Object> entry : properties.entrySet()) {
-      logger.debug("{} => {}", entry.getKey(), entry.getValue());
-
-      if (builder.getClass().getName().equals(entry.getValue().toString())) {
-        String key = entry.getKey().toString();
-        if (key.endsWith(".type")) {
-          interceptor = key.substring(0, key.length() - "type".length());
-          break;
-        }
-      }
-    }
-
-    assertNotNull(builder.getClass().getName(), interceptor);
-    @SuppressWarnings({"null", "ConstantConditions"})
-    final int interceptorLength = interceptor.length();
-
-    HashMap<String, String> map = new HashMap<String, String>();
-    for (Entry<Object, Object> entry : properties.entrySet()) {
-      String key = entry.getKey().toString();
-      if (key.startsWith(interceptor)) {
-        map.put(key.substring(interceptorLength), entry.getValue().toString());
-      }
-    }
-
-    builder.configure(new Context(map));
-    Interceptor interceptorInstance = builder.build();
-
-    URL url = getClass().getResource("/test_data/gentxns/");
-    assertNotNull("Generated Transactions", url);
-
-    int records = 0;
-    File dir = new File(url.toURI());
-    for (File file : dir.listFiles()) {
-      records += processFile(file, interceptorInstance);
-    }
-
-    Assert.assertEquals("Total Records", 2200, records);
-  }
-
-  private int processFile(File file, Interceptor interceptor) throws IOException
-  {
-    InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName());
-    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
-
-    String line;
-    int i = 0;
-    while ((line = br.readLine()) != null) {
-      byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody();
-      RawEvent event = RawEvent.from(body, FIELD_SEPARATOR);
-      Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid);
-      logger.debug("guid = {}, time = {}", event.guid, event.time);
-      i++;
-    }
-
-    br.close();
-    return i;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
deleted file mode 100644
index c029cd0..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.Serializable;
-
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class RawEvent implements Serializable
-{
-  public Slice guid;
-  public long time;
-  public int dimensionsOffset;
-
-  public Slice getGUID()
-  {
-    return guid;
-  }
-
-  public long getTime()
-  {
-    return time;
-  }
-
-  RawEvent()
-  {
-    /* needed for Kryo serialization */
-  }
-
-  public static RawEvent from(byte[] row, byte separator)
-  {
-    final int rowsize = row.length;
-
-    /*
-     * Lets get the guid out of the current record
-     */
-    int sliceLengh = -1;
-    while (++sliceLengh < rowsize) {
-      if (row[sliceLengh] == separator) {
-        break;
-      }
-    }
-
-    int i = sliceLengh + 1;
-
-    /* lets parse the date */
-    int dateStart = i;
-    while (i < rowsize) {
-      if (row[i++] == separator) {
-        long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1));
-        RawEvent event = new RawEvent();
-        event.guid = new Slice(row, 0, sliceLengh);
-        event.time = time;
-        event.dimensionsOffset = i;
-        return event;
-      }
-    }
-
-    return null;
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int hash = 5;
-    hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0);
-    hash = 61 * hash + (int)(this.time ^ (this.time >>> 32));
-    return hash;
-  }
-
-  @Override
-  public String toString()
-  {
-    return "RawEvent{" + "guid=" + guid + ", time=" + time + '}';
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    final RawEvent other = (RawEvent)obj;
-    if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) {
-      return false;
-    }
-    return this.time == other.time;
-  }
-
-  private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
-  private static final Logger logger = LoggerFactory.getLogger(RawEvent.class);
-  private static final long serialVersionUID = 201312191312L;
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
deleted file mode 100644
index 2f162a8..0000000
--- a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.operator;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- */
-public class AbstractFlumeInputOperatorTest
-{
-  public AbstractFlumeInputOperatorTest()
-  {
-  }
-
-  @Test
-  public void testThreadLocal()
-  {
-    ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>()
-    {
-      @Override
-      protected Set<Integer> initialValue()
-      {
-        return new HashSet<Integer>();
-      }
-
-    };
-    Set<Integer> get1 = tl.get();
-    get1.add(1);
-    assertTrue("Just Added Value", get1.contains(1));
-
-    Set<Integer> get2 = tl.get();
-    assertTrue("Previously added value", get2.contains(1));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
deleted file mode 100644
index 7949e63..0000000
--- a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.channel.MemoryChannel;
-
-import com.datatorrent.flume.discovery.Discovery;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class DTFlumeSinkTest
-{
-  static final String hostname = "localhost";
-  int port = 0;
-
-  @Test
-  @SuppressWarnings("SleepWhileInLoop")
-  public void testServer() throws InterruptedException, IOException
-  {
-    Discovery<byte[]> discovery = new Discovery<byte[]>()
-    {
-      @Override
-      public synchronized void unadvertise(Service<byte[]> service)
-      {
-        notify();
-      }
-
-      @Override
-      public synchronized void advertise(Service<byte[]> service)
-      {
-        port = service.getPort();
-        logger.debug("listening at {}", service);
-        notify();
-      }
-
-      @Override
-      @SuppressWarnings("unchecked")
-      public synchronized Collection<Service<byte[]>> discover()
-      {
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-          throw new RuntimeException(ie);
-        }
-        return Collections.EMPTY_LIST;
-      }
-
-    };
-    DTFlumeSink sink = new DTFlumeSink();
-    sink.setName("TeskSink");
-    sink.setHostname(hostname);
-    sink.setPort(0);
-    sink.setAcceptedTolerance(2000);
-    sink.setChannel(new MemoryChannel());
-    sink.setDiscovery(discovery);
-    sink.start();
-    AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
-    {
-      private byte[] array;
-      private int offset = 2;
-
-      @Override
-      public void onMessage(byte[] buffer, int offset, int size)
-      {
-        Slice received = new Slice(buffer, offset, size);
-        logger.debug("Client Received = {}", received);
-        Assert.assertEquals(received,
-            new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE));
-        synchronized (DTFlumeSinkTest.this) {
-          DTFlumeSinkTest.this.notify();
-        }
-      }
-
-      @Override
-      public void connected()
-      {
-        super.connected();
-        array = new byte[Server.Request.FIXED_SIZE + offset];
-        array[offset] = Server.Command.ECHO.getOrdinal();
-        array[offset + 1] = 1;
-        array[offset + 2] = 2;
-        array[offset + 3] = 3;
-        array[offset + 4] = 4;
-        array[offset + 5] = 5;
-        array[offset + 6] = 6;
-        array[offset + 7] = 7;
-        array[offset + 8] = 8;
-        Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis());
-        write(array, offset, Server.Request.FIXED_SIZE);
-      }
-
-    };
-
-    DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
-    eventloop.start();
-    discovery.discover();
-    try {
-      eventloop.connect(new InetSocketAddress(hostname, port), client);
-      try {
-        synchronized (this) {
-          this.wait();
-        }
-      } finally {
-        eventloop.disconnect(client);
-      }
-    } finally {
-      eventloop.stop();
-    }
-
-    sink.stop();
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
deleted file mode 100644
index 8c225d1..0000000
--- a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.util.Random;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class ServerTest
-{
-  byte[] array;
-
-  public ServerTest()
-  {
-    array = new byte[1024];
-  }
-
-  @Test
-  public void testInt()
-  {
-    Server.writeInt(array, 0, Integer.MAX_VALUE);
-    Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0));
-
-    Server.writeInt(array, 0, Integer.MIN_VALUE);
-    Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0));
-
-    Server.writeInt(array, 0, 0);
-    Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0));
-
-    Random rand = new Random();
-    for (int i = 0; i < 128; i++) {
-      int n = rand.nextInt();
-      if (rand.nextBoolean()) {
-        n = -n;
-      }
-      Server.writeInt(array, 0, n);
-      Assert.assertEquals("Random Integer", n, Server.readInt(array, 0));
-    }
-  }
-
-  @Test
-  public void testLong()
-  {
-    Server.writeLong(array, 0, Integer.MAX_VALUE);
-    Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0));
-
-    Server.writeLong(array, 0, Integer.MIN_VALUE);
-    Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0));
-
-    Server.writeLong(array, 0, 0);
-    Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0));
-
-    Server.writeLong(array, 0, Long.MAX_VALUE);
-    Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0));
-
-    Server.writeLong(array, 0, Long.MIN_VALUE);
-    Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0));
-
-    Server.writeLong(array, 0, 0L);
-    Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0));
-
-    Random rand = new Random();
-    for (int i = 0; i < 128; i++) {
-      long n = rand.nextLong();
-      if (rand.nextBoolean()) {
-        n = -n;
-      }
-      Server.writeLong(array, 0, n);
-      Assert.assertEquals("Random Long", n, Server.readLong(array, 0));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
deleted file mode 100644
index 6b6adcb..0000000
--- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.primitives.Ints;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class HDFSStorageMatching
-{
-
-  public static void main(String[] args)
-  {
-    HDFSStorage storage = new HDFSStorage();
-    storage.setBaseDir(args[0]);
-    storage.setId(args[1]);
-    storage.setRestore(true);
-    storage.setup(null);
-    int count = 100000000;
-
-    logger.debug(" start time {}", System.currentTimeMillis());
-    int index = 10000;
-    byte[] b = Ints.toByteArray(index);
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    logger.debug(" end time {}", System.currentTimeMillis());
-    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
-    b = storage.retrieve(new byte[8]);
-    int org_index = index;
-    index = 10000;
-    match(b, index);
-    while (true) {
-      index++;
-      b = storage.retrieveNext();
-      if (b == null) {
-        logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index);
-        return;
-      } else {
-        if (!match(b, index)) {
-          throw new RuntimeException("failed : " + index);
-        }
-      }
-    }
-
-  }
-
-  public static boolean match(byte[] data, int match)
-  {
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    int dataR = Ints.fromByteArray(tempData);
-    //logger.debug("input: {}, output: {}",match,dataR);
-    if (match == dataR) {
-      return true;
-    }
-    return false;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSStorageMatching.class);
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
deleted file mode 100644
index 098f3f7..0000000
--- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class HDFSStoragePerformance
-{
-
-  public static void main(String[] args)
-  {
-    HDFSStorage storage = new HDFSStorage();
-    storage.setBaseDir(".");
-    storage.setId("gaurav_flume_1");
-    storage.setRestore(true);
-    storage.setup(null);
-    int count = 1000000;
-
-    logger.debug(" start time {}", System.currentTimeMillis());
-    int index = 10000;
-    byte[] b = new byte[1024];
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-    }
-    storage.flush();
-    logger.debug(" end time {}", System.currentTimeMillis());
-    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
-    storage.retrieve(new byte[8]);
-    String inputData = new String(b);
-    index = 1;
-    while (true) {
-      b = storage.retrieveNext();
-      if (b == null) {
-        logger.debug(" end time for retrieve {}", System.currentTimeMillis());
-        return;
-      } else {
-        if (!match(b, inputData)) {
-          throw new RuntimeException("failed : " + index);
-        }
-      }
-
-      index++;
-    }
-
-  }
-
-  public static boolean match(byte[] data, String match)
-  {
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-//    logger.debug("input: {}, output: {}",match,new String(tempData));
-    return (match.equals(new String(tempData)));
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformance.class);
-}
-