You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:04 UTC

[06/13] apex-malhar git commit: Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/sink/Server.java b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
deleted file mode 100644
index 03c1ff0..0000000
--- a/flume/src/main/java/com/datatorrent/flume/sink/Server.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.flume.discovery.Discovery;
-import com.datatorrent.flume.discovery.Discovery.Service;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
-import com.datatorrent.netlet.AbstractServer;
-import com.datatorrent.netlet.EventLoop;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>
- * Server class.</p>
- *
- * @since 0.9.2
- */
-public class Server extends AbstractServer
-{
-  private final String id;
-  private final Discovery<byte[]> discovery;
-  private final long acceptedTolerance;
-
-  public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance)
-  {
-    this.id = id;
-    this.discovery = discovery;
-    this.acceptedTolerance = acceptedTolerance;
-  }
-
-  @Override
-  public void handleException(Exception cce, EventLoop el)
-  {
-    logger.error("Server Error", cce);
-    Request r = new Request(Command.SERVER_ERROR, null)
-    {
-      @Override
-      public Slice getAddress()
-      {
-        throw new UnsupportedOperationException("Not supported yet.");
-      }
-
-      @Override
-      public int getEventCount()
-      {
-        throw new UnsupportedOperationException("Not supported yet.");
-      }
-
-      @Override
-      public int getIdleCount()
-      {
-        throw new UnsupportedOperationException("Not supported yet.");
-      }
-
-    };
-    synchronized (requests) {
-      requests.add(r);
-    }
-  }
-
-  private final Service<byte[]> service = new Service<byte[]>()
-  {
-    @Override
-    public String getHost()
-    {
-      return ((InetSocketAddress)getServerAddress()).getHostName();
-    }
-
-    @Override
-    public int getPort()
-    {
-      return ((InetSocketAddress)getServerAddress()).getPort();
-    }
-
-    @Override
-    public byte[] getPayload()
-    {
-      return null;
-    }
-
-    @Override
-    public String getId()
-    {
-      return id;
-    }
-
-    @Override
-    public String toString()
-    {
-      return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" +
-          Arrays.toString(getPayload()) + '}';
-    }
-
-  };
-
-  @Override
-  public void unregistered(final SelectionKey key)
-  {
-    discovery.unadvertise(service);
-    super.unregistered(key);
-  }
-
-  @Override
-  public void registered(final SelectionKey key)
-  {
-    super.registered(key);
-    discovery.advertise(service);
-  }
-
-  public enum Command
-  {
-    ECHO((byte)0),
-    SEEK((byte)1),
-    COMMITTED((byte)2),
-    CHECKPOINTED((byte)3),
-    CONNECTED((byte)4),
-    DISCONNECTED((byte)5),
-    WINDOWED((byte)6),
-    SERVER_ERROR((byte)7);
-
-    Command(byte b)
-    {
-      this.ord = b;
-    }
-
-    public byte getOrdinal()
-    {
-      return ord;
-    }
-
-    public static Command getCommand(byte b)
-    {
-      Command c;
-      switch (b) {
-        case 0:
-          c = ECHO;
-          break;
-
-        case 1:
-          c = SEEK;
-          break;
-
-        case 2:
-          c = COMMITTED;
-          break;
-
-        case 3:
-          c = CHECKPOINTED;
-          break;
-
-        case 4:
-          c = CONNECTED;
-          break;
-
-        case 5:
-          c = DISCONNECTED;
-          break;
-
-        case 6:
-          c = WINDOWED;
-          break;
-
-        case 7:
-          c = SERVER_ERROR;
-          break;
-
-        default:
-          throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
-      }
-
-      assert (b == c.ord);
-      return c;
-    }
-
-    private final byte ord;
-  }
-
-  public final ArrayList<Request> requests = new ArrayList<Request>(4);
-
-  @Override
-  public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
-  {
-    Client lClient = new Client();
-    lClient.connected();
-    return lClient;
-  }
-
-  public class Client extends AbstractLengthPrependerClient
-  {
-
-    @Override
-    public void onMessage(byte[] buffer, int offset, int size)
-    {
-      if (size != Request.FIXED_SIZE) {
-        logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
-            key.channel());
-        return;
-      }
-
-      long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET);
-      if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) {
-        logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
-            key.channel());
-        return;
-      }
-
-      try {
-        if (Command.getCommand(buffer[offset]) == Command.ECHO) {
-          write(buffer, offset, size);
-          return;
-        }
-      } catch (IllegalArgumentException ex) {
-        logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size),
-            key.channel(), ex);
-        return;
-      }
-
-      Request r = Request.getRequest(buffer, offset, this);
-      synchronized (requests) {
-        requests.add(r);
-      }
-    }
-
-    @Override
-    public void disconnected()
-    {
-      synchronized (requests) {
-        requests.add(Request.getRequest(
-            new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
-      }
-      super.disconnected();
-    }
-
-    public boolean write(byte[] address, Slice event)
-    {
-      if (event.offset == 0 && event.length == event.buffer.length) {
-        return write(address, event.buffer);
-      }
-
-      // a better method would be to replace the write implementation and allow it to natively support writing slices
-      return write(address, event.toByteArray());
-    }
-
-  }
-
-  public abstract static class Request
-  {
-    public static final int FIXED_SIZE = 17;
-    public static final int TIME_OFFSET = 9;
-    public final Command type;
-    public final Client client;
-
-    public Request(Command type, Client client)
-    {
-      this.type = type;
-      this.client = client;
-    }
-
-    public abstract Slice getAddress();
-
-    public abstract int getEventCount();
-
-    public abstract int getIdleCount();
-
-    @Override
-    public String toString()
-    {
-      return "Request{" + "type=" + type + '}';
-    }
-
-    public static Request getRequest(final byte[] buffer, final int offset, Client client)
-    {
-      Command command = Command.getCommand(buffer[offset]);
-      switch (command) {
-        case WINDOWED:
-          return new Request(Command.WINDOWED, client)
-          {
-            final int eventCount;
-            final int idleCount;
-
-            {
-              eventCount = Server.readInt(buffer, offset + 1);
-              idleCount = Server.readInt(buffer, offset + 5);
-            }
-
-            @Override
-            public Slice getAddress()
-            {
-              throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public int getEventCount()
-            {
-              return eventCount;
-            }
-
-            @Override
-            public int getIdleCount()
-            {
-              return idleCount;
-            }
-
-            @Override
-            public String toString()
-            {
-              return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}';
-            }
-
-          };
-
-        default:
-          return new Request(command, client)
-          {
-            final Slice address;
-
-            {
-              address = new Slice(buffer, offset + 1, 8);
-            }
-
-            @Override
-            public Slice getAddress()
-            {
-              return address;
-            }
-
-            @Override
-            public int getEventCount()
-            {
-              throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public int getIdleCount()
-            {
-              throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public String toString()
-            {
-              return "Request{" + "type=" + type + ", address=" + address + '}';
-            }
-
-          };
-
-      }
-
-    }
-
-  }
-
-  public static int readInt(byte[] buffer, int offset)
-  {
-    return buffer[offset++] & 0xff
-           | (buffer[offset++] & 0xff) << 8
-           | (buffer[offset++] & 0xff) << 16
-           | (buffer[offset++] & 0xff) << 24;
-  }
-
-  public static void writeInt(byte[] buffer, int offset, int i)
-  {
-    buffer[offset++] = (byte)i;
-    buffer[offset++] = (byte)(i >>> 8);
-    buffer[offset++] = (byte)(i >>> 16);
-    buffer[offset++] = (byte)(i >>> 24);
-  }
-
-  public static long readLong(byte[] buffer, int offset)
-  {
-    return (long)buffer[offset++] & 0xff
-           | (long)(buffer[offset++] & 0xff) << 8
-           | (long)(buffer[offset++] & 0xff) << 16
-           | (long)(buffer[offset++] & 0xff) << 24
-           | (long)(buffer[offset++] & 0xff) << 32
-           | (long)(buffer[offset++] & 0xff) << 40
-           | (long)(buffer[offset++] & 0xff) << 48
-           | (long)(buffer[offset++] & 0xff) << 56;
-  }
-
-  public static void writeLong(byte[] buffer, int offset, long l)
-  {
-    buffer[offset++] = (byte)l;
-    buffer[offset++] = (byte)(l >>> 8);
-    buffer[offset++] = (byte)(l >>> 16);
-    buffer[offset++] = (byte)(l >>> 24);
-    buffer[offset++] = (byte)(l >>> 32);
-    buffer[offset++] = (byte)(l >>> 40);
-    buffer[offset++] = (byte)(l >>> 48);
-    buffer[offset++] = (byte)(l >>> 56);
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(Server.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
deleted file mode 100644
index 72e1913..0000000
--- a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.source;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.annotation.Nonnull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * <p>TestSource class.</p>
- *
- * @since 0.9.4
- */
-public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable
-{
-  public static final String SOURCE_DIR = "sourceDir";
-  public static final String RATE = "rate";
-  public static final String INIT_DATE = "initDate";
-
-  static byte FIELD_SEPARATOR = 2;
-  public Timer emitTimer;
-  @Nonnull
-  String directory;
-  Path directoryPath;
-  int rate;
-  String initDate;
-  long initTime;
-  List<String> dataFiles;
-  long oneDayBack;
-
-  private transient BufferedReader br = null;
-  protected transient FileSystem fs;
-  private transient Configuration configuration;
-
-  private transient int currentFile = 0;
-  private transient boolean finished;
-  private List<Event> events;
-
-  public HdfsTestSource()
-  {
-    super();
-    this.rate = 2500;
-    dataFiles = Lists.newArrayList();
-    Calendar calendar = Calendar.getInstance();
-    calendar.add(Calendar.DATE, -1);
-    oneDayBack = calendar.getTimeInMillis();
-    configuration = new Configuration();
-    events = Lists.newArrayList();
-  }
-
-  @Override
-  public void configure(Context context)
-  {
-    directory = context.getString(SOURCE_DIR);
-    rate = context.getInteger(RATE, rate);
-    initDate = context.getString(INIT_DATE);
-
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(directory));
-    directoryPath = new Path(directory);
-
-    String[] parts = initDate.split("-");
-    Preconditions.checkArgument(parts.length == 3);
-    Calendar calendar = Calendar.getInstance();
-    calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0);
-    initTime = calendar.getTimeInMillis();
-
-    try {
-      List<String> files = findFiles();
-      for (String file : files) {
-        dataFiles.add(file);
-      }
-      if (logger.isDebugEnabled()) {
-        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
-        logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack),
-            dateFormat.format(new Date(initTime)), currentFile);
-        for (String file : dataFiles) {
-          logger.debug("settings add file {}", file);
-        }
-      }
-
-      fs = FileSystem.newInstance(new Path(directory).toUri(), configuration);
-      Path filePath = new Path(dataFiles.get(currentFile));
-      br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    finished = true;
-
-  }
-
-  private List<String> findFiles() throws IOException
-  {
-    List<String> files = Lists.newArrayList();
-    Path directoryPath = new Path(directory);
-    FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration);
-    try {
-      logger.debug("checking for new files in {}", directoryPath);
-      RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true);
-      for (; statuses.hasNext(); ) {
-        FileStatus status = statuses.next();
-        Path path = status.getPath();
-        String filePathStr = path.toString();
-        if (!filePathStr.endsWith(".gz")) {
-          continue;
-        }
-        logger.debug("new file {}", filePathStr);
-        files.add(path.toString());
-      }
-    } catch (FileNotFoundException e) {
-      logger.warn("Failed to list directory {}", directoryPath, e);
-      throw new RuntimeException(e);
-    } finally {
-      lfs.close();
-    }
-    return files;
-  }
-
-  @Override
-  public void start()
-  {
-    super.start();
-    emitTimer = new Timer();
-
-    final ChannelProcessor channelProcessor = getChannelProcessor();
-    emitTimer.scheduleAtFixedRate(new TimerTask()
-    {
-      @Override
-      public void run()
-      {
-        int lineCount = 0;
-        events.clear();
-        try {
-          while (lineCount < rate && !finished) {
-            String line = br.readLine();
-
-            if (line == null) {
-              logger.debug("completed file {}", currentFile);
-              br.close();
-              currentFile++;
-              if (currentFile == dataFiles.size()) {
-                logger.info("finished all files");
-                finished = true;
-                break;
-              }
-              Path filePath = new Path(dataFiles.get(currentFile));
-              br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
-              logger.info("opening file {}. {}", currentFile, filePath);
-              continue;
-            }
-            lineCount++;
-            Event flumeEvent = EventBuilder.withBody(line.getBytes());
-            events.add(flumeEvent);
-          }
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-        if (events.size() > 0) {
-          channelProcessor.processEventBatch(events);
-        }
-        if (finished) {
-          emitTimer.cancel();
-        }
-      }
-
-    }, 0, 1000);
-  }
-
-  @Override
-  public void stop()
-  {
-    emitTimer.cancel();
-    super.stop();
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
deleted file mode 100644
index 5773de3..0000000
--- a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.source;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.annotation.Nonnull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * <p>TestSource class.</p>
- *
- * @since 0.9.4
- */
-public class TestSource extends AbstractSource implements EventDrivenSource, Configurable
-{
-  public static final String SOURCE_FILE = "sourceFile";
-  public static final String LINE_NUMBER = "lineNumber";
-  public static final String RATE = "rate";
-  public static final String PERCENT_PAST_EVENTS = "percentPastEvents";
-  static byte FIELD_SEPARATOR = 1;
-  static int DEF_PERCENT_PAST_EVENTS = 5;
-  public Timer emitTimer;
-  @Nonnull
-  String filePath;
-  int rate;
-  int numberOfPastEvents;
-  transient List<Row> cache;
-  private transient int startIndex;
-  private transient Random random;
-  private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
-  private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  public TestSource()
-  {
-    super();
-    this.rate = 2500;
-    this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25;
-    this.random = new Random();
-
-  }
-
-  @Override
-  public void configure(Context context)
-  {
-    filePath = context.getString(SOURCE_FILE);
-    rate = context.getInteger(RATE, rate);
-    int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS);
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath));
-    try {
-      BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
-      try {
-        buildCache(lineReader);
-      } finally {
-        lineReader.close();
-      }
-    } catch (FileNotFoundException e) {
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) {
-      numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size());
-    }
-  }
-
-  @Override
-  public void start()
-  {
-    super.start();
-    emitTimer = new Timer();
-
-    final ChannelProcessor channel = getChannelProcessor();
-    final int cacheSize = cache.size();
-    emitTimer.scheduleAtFixedRate(new TimerTask()
-    {
-      @Override
-      public void run()
-      {
-        int lastIndex = startIndex + rate;
-        if (lastIndex > cacheSize) {
-          lastIndex -= cacheSize;
-          processBatch(channel, cache.subList(startIndex, cacheSize));
-          startIndex = 0;
-          while (lastIndex > cacheSize) {
-            processBatch(channel, cache);
-            lastIndex -= cacheSize;
-          }
-          processBatch(channel, cache.subList(0, lastIndex));
-        } else {
-          processBatch(channel, cache.subList(startIndex, lastIndex));
-        }
-        startIndex = lastIndex;
-      }
-
-    }, 0, 1000);
-  }
-
-  private void processBatch(ChannelProcessor channelProcessor, List<Row> rows)
-  {
-    if (rows.isEmpty()) {
-      return;
-    }
-
-    int noise = random.nextInt(numberOfPastEvents + 1);
-    Set<Integer> pastIndices = Sets.newHashSet();
-    for (int i = 0; i < noise; i++) {
-      pastIndices.add(random.nextInt(rows.size()));
-    }
-
-    Calendar calendar = Calendar.getInstance();
-    long high = calendar.getTimeInMillis();
-    calendar.add(Calendar.DATE, -2);
-    long low = calendar.getTimeInMillis();
-
-
-
-    List<Event> events = Lists.newArrayList();
-    for (int i = 0; i < rows.size(); i++) {
-      Row eventRow = rows.get(i);
-      if (pastIndices.contains(i)) {
-        long pastTime = (long)((Math.random() * (high - low)) + low);
-        byte[] pastDateField = dateFormat.format(pastTime).getBytes();
-        byte[] pastTimeField = timeFormat.format(pastTime).getBytes();
-
-        System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length);
-        System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length);
-      } else {
-        calendar.setTimeInMillis(System.currentTimeMillis());
-        byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes();
-        byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes();
-
-        System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length);
-        System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length);
-      }
-
-      HashMap<String, String> headers = new HashMap<String, String>(2);
-      headers.put(SOURCE_FILE, filePath);
-      headers.put(LINE_NUMBER, String.valueOf(startIndex + i));
-      events.add(EventBuilder.withBody(eventRow.bytes, headers));
-    }
-    channelProcessor.processEventBatch(events);
-  }
-
-  @Override
-  public void stop()
-  {
-    emitTimer.cancel();
-    super.stop();
-  }
-
-  private void buildCache(BufferedReader lineReader) throws IOException
-  {
-    cache = Lists.newArrayListWithCapacity(rate);
-
-    String line;
-    while ((line = lineReader.readLine()) != null) {
-      byte[] row = line.getBytes();
-      Row eventRow = new Row(row);
-      final int rowsize = row.length;
-
-      /* guid */
-      int sliceLengh = -1;
-      while (++sliceLengh < rowsize) {
-        if (row[sliceLengh] == FIELD_SEPARATOR) {
-          break;
-        }
-      }
-      int recordStart = sliceLengh + 1;
-      int pointer = sliceLengh + 1;
-      while (pointer < rowsize) {
-        if (row[pointer++] == FIELD_SEPARATOR) {
-          eventRow.dateFieldStart = recordStart;
-          break;
-        }
-      }
-
-      /* lets parse the date */
-      int dateStart = pointer;
-      while (pointer < rowsize) {
-        if (row[pointer++] == FIELD_SEPARATOR) {
-          eventRow.timeFieldStart = dateStart;
-          break;
-        }
-      }
-
-      cache.add(eventRow);
-    }
-  }
-
-  private static class Row
-  {
-    final byte[] bytes;
-    int dateFieldStart;
-    int timeFieldStart;
-//    boolean past;
-
-    Row(byte[] bytes)
-    {
-      this.bytes = bytes;
-    }
-
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(TestSource.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
deleted file mode 100644
index da94154..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.Configurable;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>DebugWrapper class.</p>
- *
- * @since 0.9.4
- */
-public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context>
-{
-  HDFSStorage storage = new HDFSStorage();
-
-  @Override
-  public byte[] store(Slice bytes)
-  {
-    byte[] ret = null;
-
-    try {
-      ret = storage.store(bytes);
-    } finally {
-      logger.debug("storage.store(new byte[]{{}});", bytes);
-    }
-
-    return ret;
-  }
-
-  @Override
-  public byte[] retrieve(byte[] identifier)
-  {
-    byte[] ret = null;
-
-    try {
-      ret = storage.retrieve(identifier);
-    } finally {
-      logger.debug("storage.retrieve(new byte[]{{}});", identifier);
-    }
-
-    return ret;
-  }
-
-  @Override
-  public byte[] retrieveNext()
-  {
-    byte[] ret = null;
-    try {
-      ret = storage.retrieveNext();
-    } finally {
-      logger.debug("storage.retrieveNext();");
-    }
-
-    return ret;
-  }
-
-  @Override
-  public void clean(byte[] identifier)
-  {
-    try {
-      storage.clean(identifier);
-    } finally {
-      logger.debug("storage.clean(new byte[]{{}});", identifier);
-    }
-  }
-
-  @Override
-  public void flush()
-  {
-    try {
-      storage.flush();
-    } finally {
-      logger.debug("storage.flush();");
-    }
-  }
-
-  @Override
-  public void configure(Context cntxt)
-  {
-    try {
-      storage.configure(cntxt);
-    } finally {
-      logger.debug("storage.configure({});", cntxt);
-    }
-  }
-
-  @Override
-  public void setup(com.datatorrent.api.Context t1)
-  {
-    try {
-      storage.setup(t1);
-    } finally {
-      logger.debug("storage.setup({});", t1);
-    }
-
-  }
-
-  @Override
-  public void teardown()
-  {
-    try {
-      storage.teardown();
-    } finally {
-      logger.debug("storage.teardown();");
-    }
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
deleted file mode 100644
index 76f663c..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>ErrorMaskingEventCodec class.</p>
- *
- * @since 1.0.4
- */
-public class ErrorMaskingEventCodec extends EventCodec
-{
-
-  @Override
-  public Object fromByteArray(Slice fragment)
-  {
-    try {
-      return super.fromByteArray(fragment);
-    } catch (RuntimeException re) {
-      logger.warn("Cannot deserialize event {}", fragment, re);
-    }
-
-    return null;
-  }
-
-  @Override
-  public Slice toByteArray(Event event)
-  {
-    try {
-      return super.toByteArray(event);
-    } catch (RuntimeException re) {
-      logger.warn("Cannot serialize event {}", event, re);
-    }
-
-    return null;
-  }
-
-
-  private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
deleted file mode 100644
index 0ece548..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>EventCodec class.</p>
- *
- * @since 0.9.4
- */
-public class EventCodec implements StreamCodec<Event>
-{
-  private final transient Kryo kryo;
-
-  public EventCodec()
-  {
-    this.kryo = new Kryo();
-    this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-  }
-
-  @Override
-  public Object fromByteArray(Slice fragment)
-  {
-    ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
-    Input input = new Input(is);
-
-    @SuppressWarnings("unchecked")
-    HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class);
-    byte[] body = kryo.readObjectOrNull(input, byte[].class);
-    return EventBuilder.withBody(body, headers);
-  }
-
-  @Override
-  public Slice toByteArray(Event event)
-  {
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    Output output = new Output(os);
-
-    Map<String, String> headers = event.getHeaders();
-    if (headers != null && headers.getClass() != HashMap.class) {
-      HashMap<String, String> tmp = new HashMap<String, String>(headers.size());
-      tmp.putAll(headers);
-      headers = tmp;
-    }
-    kryo.writeObjectOrNull(output, headers, HashMap.class);
-    kryo.writeObjectOrNull(output, event.getBody(), byte[].class);
-    output.flush();
-    final byte[] bytes = os.toByteArray();
-    return new Slice(bytes, 0, bytes.length);
-  }
-
-  @Override
-  public int getPartition(Event o)
-  {
-    return o.hashCode();
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(EventCodec.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
deleted file mode 100644
index 4dcddcd..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
+++ /dev/null
@@ -1,947 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import java.io.DataInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.common.util.NameableThreadFactory;
-import com.datatorrent.flume.sink.Server;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * HDFSStorage is developed to store and retrieve the data from HDFS
- * <p />
- * The properties that can be set on HDFSStorage are: <br />
- * baseDir - The base directory where the data is going to be stored <br />
- * restore - This is used to restore the application from previous failure <br />
- * blockSize - The maximum size of the each file to created. <br />
- *
- * @since 0.9.3
- */
-public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context>
-{
-  public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
-  public static final String BASE_DIR_KEY = "baseDir";
-  public static final String RESTORE_KEY = "restore";
-  public static final String BLOCKSIZE = "blockSize";
-  public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple";
-  public static final String NUMBER_RETRY = "retryCount";
-
-  private static final String OFFSET_SUFFIX = "-offsetFile";
-  private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile";
-  private static final String FLUSHED_IDENTITY_FILE = "flushedCounter";
-  private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile";
-  private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp";
-  private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp";
-  private static final int IDENTIFIER_SIZE = 8;
-  private static final int DATA_LENGTH_BYTE_SIZE = 4;
-
-  /**
-   * Number of times the storage will try to get the filesystem
-   */
-  private int retryCount = 3;
-  /**
-   * The multiple of block size
-   */
-  private int blockSizeMultiple = 1;
-  /**
-   * Identifier for this storage.
-   */
-  @NotNull
-  private String id;
-  /**
-   * The baseDir where the storage facility is going to create files.
-   */
-  @NotNull
-  private String baseDir;
-  /**
-   * The block size to be used to create the storage files
-   */
-  private long blockSize;
-  /**
-   *
-   */
-  private boolean restore;
-  /**
-   * This identifies the current file number
-   */
-  private long currentWrittenFile;
-  /**
-   * This identifies the file number that has been flushed
-   */
-  private long flushedFileCounter;
-  /**
-   * The file that stores the fileCounter information
-   */
-  // private Path fileCounterFile;
-  /**
-   * The file that stores the flushed fileCounter information
-   */
-  private Path flushedCounterFile;
-  private Path flushedCounterFileTemp;
-  /**
-   * This identifies the last cleaned file number
-   */
-  private long cleanedFileCounter;
-  /**
-   * The file that stores the clean file counter information
-   */
-  // private Path cleanFileCounterFile;
-  /**
-   * The file that stores the clean file offset information
-   */
-  private Path cleanFileOffsetFile;
-  private Path cleanFileOffsetFileTemp;
-  private FileSystem fs;
-  private FSDataOutputStream dataStream;
-  ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>();
-  /**
-   * The offset in the current opened file
-   */
-  private long fileWriteOffset;
-  private FSDataInputStream readStream;
-  private long retrievalOffset;
-  private long retrievalFile;
-  private int offset;
-  private long flushedLong;
-  private long flushedFileWriteOffset;
-  private long bookKeepingFileOffset;
-  private byte[] cleanedOffset = new byte[8];
-  private long skipOffset;
-  private long skipFile;
-  private transient Path basePath;
-  private ExecutorService storageExecutor;
-  private byte[] currentData;
-  private FSDataInputStream nextReadStream;
-  private long nextFlushedLong;
-  private long nextRetrievalFile;
-  private byte[] nextRetrievalData;
-
-  public HDFSStorage()
-  {
-    this.restore = true;
-  }
-
-  /**
-   * This stores the Identifier information identified in the last store function call
-   *
-   * @param ctx
-   */
-  @Override
-  public void configure(Context ctx)
-  {
-    String tempId = ctx.getString(ID);
-    if (tempId == null) {
-      if (id == null) {
-        throw new IllegalArgumentException("id can't be  null.");
-      }
-    } else {
-      id = tempId;
-    }
-
-    String tempBaseDir = ctx.getString(BASE_DIR_KEY);
-    if (tempBaseDir != null) {
-      baseDir = tempBaseDir;
-    }
-
-    restore = ctx.getBoolean(RESTORE_KEY, restore);
-    Long tempBlockSize = ctx.getLong(BLOCKSIZE);
-    if (tempBlockSize != null) {
-      blockSize = tempBlockSize;
-    }
-    blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple);
-    retryCount = ctx.getInteger(NUMBER_RETRY,retryCount);
-  }
-
-  /**
-   * This function reads the file at a location and return the bytes stored in the file "
-   *
-   * @param path - the location of the file
-   * @return
-   * @throws IOException
-   */
-  byte[] readData(Path path) throws IOException
-  {
-    DataInputStream is = new DataInputStream(fs.open(path));
-    byte[] bytes = new byte[is.available()];
-    is.readFully(bytes);
-    is.close();
-    return bytes;
-  }
-
-  /**
-   * This function writes the bytes to a file specified by the path
-   *
-   * @param path the file location
-   * @param data the data to be written to the file
-   * @return
-   * @throws IOException
-   */
-  private FSDataOutputStream writeData(Path path, byte[] data) throws IOException
-  {
-    FSDataOutputStream fsOutputStream;
-    if (fs.getScheme().equals("file")) {
-      // local FS does not support hflush and does not flush native stream
-      fsOutputStream = new FSDataOutputStream(
-          new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null);
-    } else {
-      fsOutputStream = fs.create(path);
-    }
-    fsOutputStream.write(data);
-    return fsOutputStream;
-  }
-
-  private long calculateOffset(long fileOffset, long fileCounter)
-  {
-    return ((fileCounter << 32) | (fileOffset & 0xffffffffL));
-  }
-
-  @Override
-  public byte[] store(Slice slice)
-  {
-    // logger.debug("store message ");
-    int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE;
-    if (currentWrittenFile < skipFile) {
-      fileWriteOffset += bytesToWrite;
-      if (fileWriteOffset >= bookKeepingFileOffset) {
-        files2Commit.add(new DataBlock(null, bookKeepingFileOffset,
-            new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile));
-        currentWrittenFile++;
-        if (fileWriteOffset > bookKeepingFileOffset) {
-          fileWriteOffset = bytesToWrite;
-        } else {
-          fileWriteOffset = 0;
-        }
-        try {
-          bookKeepingFileOffset = getFlushedFileWriteOffset(
-              new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return null;
-    }
-
-    if (flushedFileCounter == currentWrittenFile && dataStream == null) {
-      currentWrittenFile++;
-      fileWriteOffset = 0;
-    }
-
-    if (flushedFileCounter == skipFile && skipFile != -1) {
-      skipFile++;
-    }
-
-    if (fileWriteOffset + bytesToWrite < blockSize) {
-      try {
-        /* write length and the actual data to the file */
-        if (fileWriteOffset == 0) {
-          // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close();
-          dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)),
-              Ints.toByteArray(slice.length));
-          dataStream.write(slice.buffer, slice.offset, slice.length);
-        } else {
-          dataStream.write(Ints.toByteArray(slice.length));
-          dataStream.write(slice.buffer, slice.offset, slice.length);
-        }
-        fileWriteOffset += bytesToWrite;
-
-        byte[] fileOffset = null;
-        if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) {
-          skipFile = -1;
-          fileOffset = new byte[IDENTIFIER_SIZE];
-          Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile));
-        }
-        return fileOffset;
-      } catch (IOException ex) {
-        logger.warn("Error while storing the bytes {}", ex.getMessage());
-        closeFs();
-        throw new RuntimeException(ex);
-      }
-    }
-    DataBlock db = new DataBlock(dataStream, fileWriteOffset,
-        new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile);
-    db.close();
-    files2Commit.add(db);
-    fileWriteOffset = 0;
-    ++currentWrittenFile;
-    return store(slice);
-  }
-
-  /**
-   * @param b
-   * @param startIndex
-   * @return
-   */
-  long byteArrayToLong(byte[] b, int startIndex)
-  {
-    final byte b1 = 0;
-    return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]);
-  }
-
-  @Override
-  public byte[] retrieve(byte[] identifier)
-  {
-    skipFile = -1;
-    skipOffset = 0;
-    logger.debug("retrieve with address {}", Arrays.toString(identifier));
-    // flushing the last incomplete flushed file
-    closeUnflushedFiles();
-
-    retrievalOffset = byteArrayToLong(identifier, 0);
-    retrievalFile = byteArrayToLong(identifier, offset);
-
-    if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) {
-      skipOffset = 0;
-      return null;
-    }
-
-    // making sure that the deleted address is not requested again
-    if (retrievalFile != 0 || retrievalOffset != 0) {
-      long cleanedFile = byteArrayToLong(cleanedOffset, offset);
-      if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile &&
-          retrievalOffset < byteArrayToLong(cleanedOffset, 0))) {
-        logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " +
-            "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0));
-        closeFs();
-        throw new IllegalArgumentException(String.format("The data for address %s has already been deleted",
-            Arrays.toString(identifier)));
-      }
-    }
-
-    // we have just started
-    if (retrievalFile == 0 && retrievalOffset == 0) {
-      retrievalFile = byteArrayToLong(cleanedOffset, offset);
-      retrievalOffset = byteArrayToLong(cleanedOffset, 0);
-    }
-
-    if ((retrievalFile > flushedFileCounter)) {
-      skipFile = retrievalFile;
-      skipOffset = retrievalOffset;
-      retrievalFile = -1;
-      return null;
-    }
-    if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) {
-      skipFile = retrievalFile;
-      skipOffset = retrievalOffset - flushedFileWriteOffset;
-      retrievalFile = -1;
-      return null;
-    }
-
-    try {
-      if (readStream != null) {
-        readStream.close();
-        readStream = null;
-      }
-      Path path = new Path(basePath, String.valueOf(retrievalFile));
-      if (!fs.exists(path)) {
-        retrievalFile = -1;
-        closeFs();
-        throw new RuntimeException(String.format("File %s does not exist", path.toString()));
-      }
-
-      byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
-      flushedLong = Server.readLong(flushedOffset, 0);
-      while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) {
-        retrievalOffset -= flushedLong;
-        retrievalFile++;
-        flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
-        flushedLong = Server.readLong(flushedOffset, 0);
-      }
-
-      if (retrievalOffset >= flushedLong) {
-        logger.warn("data not flushed for the given identifier");
-        retrievalFile = -1;
-        return null;
-      }
-      synchronized (HDFSStorage.this) {
-        if (nextReadStream != null) {
-          nextReadStream.close();
-          nextReadStream = null;
-        }
-      }
-      currentData = null;
-      path = new Path(basePath, String.valueOf(retrievalFile));
-      //readStream = new FSDataInputStream(fs.open(path));
-      currentData = readData(path);
-      //readStream.seek(retrievalOffset);
-      storageExecutor.submit(getNextStream());
-      return retrieveHelper();
-    } catch (IOException e) {
-      closeFs();
-      throw new RuntimeException(e);
-    }
-  }
-
-  private byte[] retrieveHelper() throws IOException
-  {
-    int tempRetrievalOffset = (int)retrievalOffset;
-    int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1],
-        currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]);
-    byte[] data = new byte[length + IDENTIFIER_SIZE];
-    System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length);
-    retrievalOffset += length + DATA_LENGTH_BYTE_SIZE;
-    if (retrievalOffset >= flushedLong) {
-      Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1));
-    } else {
-      Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile));
-    }
-    return data;
-  }
-
-  @Override
-  public byte[] retrieveNext()
-  {
-    if (retrievalFile == -1) {
-      closeFs();
-      throw new RuntimeException("Call retrieve first");
-    }
-
-    if (retrievalFile > flushedFileCounter) {
-      logger.warn("data is not flushed");
-      return null;
-    }
-
-    try {
-      if (currentData == null) {
-        synchronized (HDFSStorage.this) {
-          if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
-            currentData = nextRetrievalData;
-            flushedLong = nextFlushedLong;
-            nextRetrievalData = null;
-          } else {
-            currentData = null;
-            currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
-            byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
-            flushedLong = Server.readLong(flushedOffset, 0);
-          }
-        }
-        storageExecutor.submit(getNextStream());
-      }
-
-      if (retrievalOffset >= flushedLong) {
-        retrievalFile++;
-        retrievalOffset = 0;
-
-        if (retrievalFile > flushedFileCounter) {
-          logger.warn("data is not flushed");
-          return null;
-        }
-
-        //readStream.close();
-        // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile))));
-        // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
-        // flushedLong = Server.readLong(flushedOffset, 0);
-
-        synchronized (HDFSStorage.this) {
-          if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
-            currentData = nextRetrievalData;
-            flushedLong = nextFlushedLong;
-            nextRetrievalData = null;
-          } else {
-            currentData = null;
-            currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
-            byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
-            flushedLong = Server.readLong(flushedOffset, 0);
-          }
-        }
-        storageExecutor.submit(getNextStream());
-      }
-      //readStream.seek(retrievalOffset);
-      return retrieveHelper();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
-  public void clean(byte[] identifier)
-  {
-    logger.info("clean {}", Arrays.toString(identifier));
-    long cleanFileIndex = byteArrayToLong(identifier, offset);
-
-    long cleanFileOffset = byteArrayToLong(identifier, 0);
-    if (flushedFileCounter == -1) {
-      identifier = new byte[8];
-    } else if (cleanFileIndex > flushedFileCounter ||
-        (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) {
-      // This is to make sure that we clean only the data that is flushed
-      cleanFileIndex = flushedFileCounter;
-      cleanFileOffset = flushedFileWriteOffset;
-      Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex));
-    }
-    cleanedOffset = identifier;
-
-    try {
-      writeData(cleanFileOffsetFileTemp, identifier).close();
-      fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile);
-      if (cleanedFileCounter >= cleanFileIndex) {
-        return;
-      }
-      do {
-        Path path = new Path(basePath, String.valueOf(cleanedFileCounter));
-        if (fs.exists(path) && fs.isFile(path)) {
-          fs.delete(path, false);
-        }
-        path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX);
-        if (fs.exists(path) && fs.isFile(path)) {
-          fs.delete(path, false);
-        }
-        path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET);
-        if (fs.exists(path) && fs.isFile(path)) {
-          fs.delete(path, false);
-        }
-        logger.info("deleted file {}", cleanedFileCounter);
-        ++cleanedFileCounter;
-      } while (cleanedFileCounter < cleanFileIndex);
-      // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close();
-
-    } catch (IOException e) {
-      logger.warn("not able to close the streams {}", e.getMessage());
-      closeFs();
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * This is used mainly for cleaning up of counter files created
-   */
-  void cleanHelperFiles()
-  {
-    try {
-      fs.delete(basePath, true);
-    } catch (IOException e) {
-      logger.warn(e.getMessage());
-    }
-  }
-
-  private void closeUnflushedFiles()
-  {
-    try {
-      files2Commit.clear();
-      // closing the stream
-      if (dataStream != null) {
-        dataStream.close();
-        dataStream = null;
-        // currentWrittenFile++;
-        // fileWriteOffset = 0;
-      }
-
-      if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) {
-        fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false);
-      }
-
-      if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) {
-        // This means that flush was called
-        flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
-        bookKeepingFileOffset = getFlushedFileWriteOffset(
-            new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
-      }
-
-      if (flushedFileCounter != -1) {
-        currentWrittenFile = flushedFileCounter;
-        fileWriteOffset = flushedFileWriteOffset;
-      } else {
-        currentWrittenFile = 0;
-        fileWriteOffset = 0;
-      }
-
-      flushedLong = 0;
-
-    } catch (IOException e) {
-      closeFs();
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void flush()
-  {
-    nextReadStream = null;
-    StringBuilder builder = new StringBuilder();
-    Iterator<DataBlock> itr = files2Commit.iterator();
-    DataBlock db;
-    try {
-      while (itr.hasNext()) {
-        db = itr.next();
-        db.updateOffsets();
-        builder.append(db.fileName).append(", ");
-      }
-      files2Commit.clear();
-
-      if (dataStream != null) {
-        dataStream.hflush();
-        writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close();
-        fs.rename(flushedCounterFileTemp, flushedCounterFile);
-        updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset);
-        flushedFileWriteOffset = fileWriteOffset;
-        builder.append(currentWrittenFile);
-      }
-      logger.debug("flushed files {}", builder.toString());
-    } catch (IOException ex) {
-      logger.warn("not able to close the stream {}", ex.getMessage());
-      closeFs();
-      throw new RuntimeException(ex);
-    }
-    flushedFileCounter = currentWrittenFile;
-    // logger.debug("flushedFileCounter in flush {}",flushedFileCounter);
-  }
-
-  /**
-   * This updates the flushed offset
-   */
-  private void updateFlushedOffset(Path file, long bytesWritten)
-  {
-    byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE];
-    Server.writeLong(lastStoredOffset, 0, bytesWritten);
-    try {
-      writeData(file, lastStoredOffset).close();
-    } catch (IOException e) {
-      try {
-        if (!Arrays.equals(readData(file), lastStoredOffset)) {
-          closeFs();
-          throw new RuntimeException(e);
-        }
-      } catch (Exception e1) {
-        closeFs();
-        throw new RuntimeException(e1);
-      }
-    }
-  }
-
-  public int getBlockSizeMultiple()
-  {
-    return blockSizeMultiple;
-  }
-
-  public void setBlockSizeMultiple(int blockSizeMultiple)
-  {
-    this.blockSizeMultiple = blockSizeMultiple;
-  }
-
-  /**
-   * @return the baseDir
-   */
-  public String getBaseDir()
-  {
-    return baseDir;
-  }
-
-  /**
-   * @param baseDir the baseDir to set
-   */
-  public void setBaseDir(String baseDir)
-  {
-    this.baseDir = baseDir;
-  }
-
-  /**
-   * @return the id
-   */
-  public String getId()
-  {
-    return id;
-  }
-
-  /**
-   * @param id the id to set
-   */
-  public void setId(String id)
-  {
-    this.id = id;
-  }
-
-  /**
-   * @return the blockSize
-   */
-  public long getBlockSize()
-  {
-    return blockSize;
-  }
-
-  /**
-   * @param blockSize the blockSize to set
-   */
-  public void setBlockSize(long blockSize)
-  {
-    this.blockSize = blockSize;
-  }
-
-  /**
-   * @return the restore
-   */
-  public boolean isRestore()
-  {
-    return restore;
-  }
-
-  /**
-   * @param restore the restore to set
-   */
-  public void setRestore(boolean restore)
-  {
-    this.restore = restore;
-  }
-
-  class DataBlock
-  {
-    FSDataOutputStream dataStream;
-    long dataOffset;
-    Path path2FlushedData;
-    long fileName;
-    private Path bookKeepingPath;
-
-    DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName)
-    {
-      this.dataStream = stream;
-      this.dataOffset = bytesWritten;
-      this.path2FlushedData = path2FlushedData;
-      this.fileName = fileName;
-    }
-
-    public void close()
-    {
-      if (dataStream != null) {
-        try {
-          dataStream.close();
-          bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET);
-          updateFlushedOffset(bookKeepingPath, dataOffset);
-        } catch (IOException ex) {
-          logger.warn("not able to close the stream {}", ex.getMessage());
-          closeFs();
-          throw new RuntimeException(ex);
-        }
-      }
-    }
-
-    public void updateOffsets() throws IOException
-    {
-      updateFlushedOffset(path2FlushedData, dataOffset);
-      if (bookKeepingPath != null && fs.exists(bookKeepingPath)) {
-        fs.delete(bookKeepingPath, false);
-      }
-    }
-
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class);
-
-  @Override
-  public void setup(com.datatorrent.api.Context context)
-  {
-    Configuration conf = new Configuration();
-    if (baseDir == null) {
-      baseDir = conf.get("hadoop.tmp.dir");
-      if (baseDir == null || baseDir.isEmpty()) {
-        throw new IllegalArgumentException("baseDir cannot be null.");
-      }
-    }
-    offset = 4;
-    skipOffset = -1;
-    skipFile = -1;
-    int tempRetryCount = 0;
-    while (tempRetryCount < retryCount && fs == null) {
-      try {
-        fs = FileSystem.newInstance(conf);
-        tempRetryCount++;
-      } catch (Throwable throwable) {
-        logger.warn("Not able to get file system ", throwable);
-      }
-    }
-
-    try {
-      Path path = new Path(baseDir);
-      basePath = new Path(path, id);
-      if (fs == null) {
-        fs = FileSystem.newInstance(conf);
-      }
-      if (!fs.exists(path)) {
-        closeFs();
-        throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir));
-      }
-      if (!fs.isDirectory(path)) {
-        closeFs();
-        throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir));
-      }
-      if (!restore) {
-        fs.delete(basePath, true);
-      }
-      if (!fs.exists(basePath) || !fs.isDirectory(basePath)) {
-        fs.mkdirs(basePath);
-      }
-
-      if (blockSize == 0) {
-        blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData"));
-      }
-      if (blockSize == 0) {
-        blockSize = DEFAULT_BLOCK_SIZE;
-      }
-
-      blockSize = blockSizeMultiple * blockSize;
-
-      currentWrittenFile = 0;
-      cleanedFileCounter = -1;
-      retrievalFile = -1;
-      // fileCounterFile = new Path(basePath, IDENTITY_FILE);
-      flushedFileCounter = -1;
-      // cleanFileCounterFile = new Path(basePath, CLEAN_FILE);
-      cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE);
-      cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP);
-      flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE);
-      flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP);
-
-      if (restore) {
-        //
-        // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) {
-        // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile)));
-        // }
-
-        if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) {
-          cleanedOffset = readData(cleanFileOffsetFile);
-        }
-
-        if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) {
-          String strFlushedFileCounter = new String(readData(flushedCounterFile));
-          if (strFlushedFileCounter.isEmpty()) {
-            logger.warn("empty flushed file");
-          } else {
-            flushedFileCounter = Long.valueOf(strFlushedFileCounter);
-            flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
-            bookKeepingFileOffset = getFlushedFileWriteOffset(
-                new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
-          }
-
-        }
-      }
-      fileWriteOffset = flushedFileWriteOffset;
-      currentWrittenFile = flushedFileCounter;
-      cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1;
-      if (currentWrittenFile == -1) {
-        ++currentWrittenFile;
-        fileWriteOffset = 0;
-      }
-
-    } catch (IOException io) {
-
-      throw new RuntimeException(io);
-    }
-    storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper"));
-  }
-
-  private void closeFs()
-  {
-    if (fs != null) {
-      try {
-        fs.close();
-        fs = null;
-      } catch (IOException e) {
-        logger.debug(e.getMessage());
-      }
-    }
-  }
-
-  private long getFlushedFileWriteOffset(Path filePath) throws IOException
-  {
-    if (flushedFileCounter != -1 && fs.exists(filePath)) {
-      byte[] flushedFileOffsetByte = readData(filePath);
-      if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) {
-        return Server.readLong(flushedFileOffsetByte, 0);
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public void teardown()
-  {
-    logger.debug("called teardown");
-    try {
-      if (readStream != null) {
-        readStream.close();
-      }
-      synchronized (HDFSStorage.this) {
-        if (nextReadStream != null) {
-          nextReadStream.close();
-        }
-      }
-
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      closeUnflushedFiles();
-      storageExecutor.shutdown();
-    }
-
-  }
-
-  private Runnable getNextStream()
-  {
-    return new Runnable()
-    {
-      @Override
-      public void run()
-      {
-        try {
-          synchronized (HDFSStorage.this) {
-            nextRetrievalFile = retrievalFile + 1;
-            if (nextRetrievalFile > flushedFileCounter) {
-              nextRetrievalData = null;
-              return;
-            }
-            Path path = new Path(basePath, String.valueOf(nextRetrievalFile));
-            Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX);
-            nextRetrievalData = null;
-            nextRetrievalData = readData(path);
-            byte[] flushedOffset = readData(offsetPath);
-            nextFlushedLong = Server.readLong(flushedOffset, 0);
-          }
-        } catch (Throwable e) {
-          logger.warn("in storage executor ", e);
-
-        }
-      }
-    };
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
deleted file mode 100644
index 5130f3c..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>Storage interface.</p>
- *
- * @since 0.9.2
- */
-public interface Storage
-{
-  /**
-   * key in the context for Unique identifier for the storage which may be used to recover from failure.
-   */
-  String ID = "id";
-
-  /**
-   * This stores the bytes and returns the unique identifier to retrieve these bytes
-   *
-   * @param bytes
-   * @return
-   */
-  byte[] store(Slice bytes);
-
-  /**
-   * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/>
-   * The first eight bytes contain the identifier and the remaining bytes contain the data
-   *
-   * @param identifier
-   * @return
-   */
-  byte[] retrieve(byte[] identifier);
-
-  /**
-   * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based
-   * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/>
-   * The first eight bytes contain the identifier and the remaining bytes contain the data
-   *
-   * @return
-   */
-  byte[] retrieveNext();
-
-  /**
-   * This is used to clean up the files identified by identifier
-   *
-   * @param identifier
-   */
-  void clean(byte[] identifier);
-
-  /**
-   * This flushes the data from stream
-   *
-   */
-  void flush();
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
new file mode 100644
index 0000000..619a625
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.discovery;
+
+import java.util.Collection;
+
+/**
+ * When DTFlumeSink server instance binds to the network interface, it can publish
+ * its whereabouts by invoking advertise method on the Discovery object. Similarly
+ * when it ceases accepting any more connections, it can publish its intent to do
+ * so by invoking unadvertise.<p />
+ * Interesting parties can call discover method to get the list of addresses where
+ * they can find an available DTFlumeSink server instance.
+ *
+ * @param <T> - Type of the objects which can be discovered
+ * @since 0.9.3
+ */
+public interface Discovery<T>
+{
+  /**
+   * Recall the previously published address as it's no longer valid.
+   *
+   * @param service
+   */
+  void unadvertise(Service<T> service);
+
+  /**
+   * Advertise the host/port address where DTFlumeSink is accepting a client connection.
+   *
+   * @param service
+   */
+  void advertise(Service<T> service);
+
+  /**
+   * Discover all the addresses which are actively accepting the client connections.
+   *
+   * @return - Active server addresses which can accept the connections.
+   */
+  Collection<Service<T>> discover();
+
+  interface Service<T>
+  {
+    String getHost();
+
+    int getPort();
+
+    T getPayload();
+
+    String getId();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
new file mode 100644
index 0000000..9a7dd3c
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Component;
+
+/**
+ * <p>ZKAssistedDiscovery class.</p>
+ *
+ * @since 0.9.3
+ */
+public class ZKAssistedDiscovery implements Discovery<byte[]>,
+    Component<com.datatorrent.api.Context>, Configurable, Serializable
+{
+  @NotNull
+  private String serviceName;
+  @NotNull
+  private String connectionString;
+  @NotNull
+  private String basePath;
+  private int connectionTimeoutMillis;
+  private int connectionRetryCount;
+  private int conntectionRetrySleepMillis;
+  private transient InstanceSerializerFactory instanceSerializerFactory;
+  private transient CuratorFramework curatorFramework;
+  private transient ServiceDiscovery<byte[]> discovery;
+
+  public ZKAssistedDiscovery()
+  {
+    this.serviceName = "DTFlume";
+    this.conntectionRetrySleepMillis = 500;
+    this.connectionRetryCount = 10;
+    this.connectionTimeoutMillis = 1000;
+  }
+
+  @Override
+  public void unadvertise(Service<byte[]> service)
+  {
+    doAdvertise(service, false);
+  }
+
+  @Override
+  public void advertise(Service<byte[]> service)
+  {
+    doAdvertise(service, true);
+  }
+
+  public void doAdvertise(Service<byte[]> service, boolean flag)
+  {
+    try {
+      new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
+
+      ServiceInstance<byte[]> instance = getInstance(service);
+      if (flag) {
+        discovery.registerService(instance);
+      } else {
+        discovery.unregisterService(instance);
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public Collection<Service<byte[]>> discover()
+  {
+    try {
+      new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
+
+      Collection<ServiceInstance<byte[]>> services = discovery.queryForInstances(serviceName);
+      ArrayList<Service<byte[]>> returnable = new ArrayList<Service<byte[]>>(services.size());
+      for (final ServiceInstance<byte[]> service : services) {
+        returnable.add(new Service<byte[]>()
+        {
+          @Override
+          public String getHost()
+          {
+            return service.getAddress();
+          }
+
+          @Override
+          public int getPort()
+          {
+            return service.getPort();
+          }
+
+          @Override
+          public byte[] getPayload()
+          {
+            return service.getPayload();
+          }
+
+          @Override
+          public String getId()
+          {
+            return service.getId();
+          }
+
+          @Override
+          public String toString()
+          {
+            return "{" + getId() + " => " + getHost() + ':' + getPort() + '}';
+          }
+
+        });
+      }
+      return returnable;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ZKAssistedDiscovery{" + "serviceName=" + serviceName + ", connectionString=" + connectionString +
+        ", basePath=" + basePath + ", connectionTimeoutMillis=" + connectionTimeoutMillis + ", connectionRetryCount=" +
+        connectionRetryCount + ", conntectionRetrySleepMillis=" + conntectionRetrySleepMillis + '}';
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 7;
+    hash = 47 * hash + this.serviceName.hashCode();
+    hash = 47 * hash + this.connectionString.hashCode();
+    hash = 47 * hash + this.basePath.hashCode();
+    hash = 47 * hash + this.connectionTimeoutMillis;
+    hash = 47 * hash + this.connectionRetryCount;
+    hash = 47 * hash + this.conntectionRetrySleepMillis;
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj;
+    if (!this.serviceName.equals(other.serviceName)) {
+      return false;
+    }
+    if (!this.connectionString.equals(other.connectionString)) {
+      return false;
+    }
+    if (!this.basePath.equals(other.basePath)) {
+      return false;
+    }
+    if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) {
+      return false;
+    }
+    if (this.connectionRetryCount != other.connectionRetryCount) {
+      return false;
+    }
+    if (this.conntectionRetrySleepMillis != other.conntectionRetrySleepMillis) {
+      return false;
+    }
+    return true;
+  }
+
+  ServiceInstance<byte[]> getInstance(Service<byte[]> service) throws Exception
+  {
+    return ServiceInstance.<byte[]>builder()
+            .name(serviceName)
+            .address(service.getHost())
+            .port(service.getPort())
+            .id(service.getId())
+            .payload(service.getPayload())
+            .build();
+  }
+
+  private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework)
+  {
+    return ServiceDiscoveryBuilder.builder(byte[].class)
+            .basePath(basePath)
+            .client(curatorFramework)
+            .serializer(instanceSerializerFactory.getInstanceSerializer(
+            new TypeReference<ServiceInstance<byte[]>>()
+              {})).build();
+  }
+
+  /**
+   * @return the instanceSerializerFactory
+   */
+  InstanceSerializerFactory getInstanceSerializerFactory()
+  {
+    return instanceSerializerFactory;
+  }
+
+  /**
+   * @return the connectionString
+   */
+  public String getConnectionString()
+  {
+    return connectionString;
+  }
+
+  /**
+   * @param connectionString the connectionString to set
+   */
+  public void setConnectionString(String connectionString)
+  {
+    this.connectionString = connectionString;
+  }
+
+  /**
+   * @return the basePath
+   */
+  public String getBasePath()
+  {
+    return basePath;
+  }
+
+  /**
+   * @param basePath the basePath to set
+   */
+  public void setBasePath(String basePath)
+  {
+    this.basePath = basePath;
+  }
+
+  /**
+   * @return the connectionTimeoutMillis
+   */
+  public int getConnectionTimeoutMillis()
+  {
+    return connectionTimeoutMillis;
+  }
+
+  /**
+   * @param connectionTimeoutMillis the connectionTimeoutMillis to set
+   */
+  public void setConnectionTimeoutMillis(int connectionTimeoutMillis)
+  {
+    this.connectionTimeoutMillis = connectionTimeoutMillis;
+  }
+
+  /**
+   * @return the connectionRetryCount
+   */
+  public int getConnectionRetryCount()
+  {
+    return connectionRetryCount;
+  }
+
+  /**
+   * @param connectionRetryCount the connectionRetryCount to set
+   */
+  public void setConnectionRetryCount(int connectionRetryCount)
+  {
+    this.connectionRetryCount = connectionRetryCount;
+  }
+
+  /**
+   * @return the conntectionRetrySleepMillis
+   */
+  public int getConntectionRetrySleepMillis()
+  {
+    return conntectionRetrySleepMillis;
+  }
+
+  /**
+   * @param conntectionRetrySleepMillis the conntectionRetrySleepMillis to set
+   */
+  public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis)
+  {
+    this.conntectionRetrySleepMillis = conntectionRetrySleepMillis;
+  }
+
+  /**
+   * @return the serviceName
+   */
+  public String getServiceName()
+  {
+    return serviceName;
+  }
+
+  /**
+   * @param serviceName the serviceName to set
+   */
+  public void setServiceName(String serviceName)
+  {
+    this.serviceName = serviceName;
+  }
+
+  @Override
+  public void configure(org.apache.flume.Context context)
+  {
+    serviceName = context.getString("serviceName", "DTFlume");
+    connectionString = context.getString("connectionString");
+    basePath = context.getString("basePath");
+
+    connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000);
+    connectionRetryCount = context.getInteger("connectionRetryCount", 10);
+    conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500);
+  }
+
+  @Override
+  public void setup(com.datatorrent.api.Context context)
+  {
+    ObjectMapper om = new ObjectMapper();
+    instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());
+
+    curatorFramework = CuratorFrameworkFactory.builder()
+            .connectionTimeoutMs(connectionTimeoutMillis)
+            .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis))
+            .connectString(connectionString)
+            .build();
+    curatorFramework.start();
+
+    discovery = getDiscovery(curatorFramework);
+    try {
+      discovery.start();
+    } catch (Exception ex) {
+      Throwables.propagate(ex);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      discovery.close();
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      curatorFramework.close();
+      curatorFramework = null;
+    }
+  }
+
+  public class InstanceSerializerFactory
+  {
+    private final ObjectReader objectReader;
+    private final ObjectWriter objectWriter;
+
+    InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter)
+    {
+      this.objectReader = objectReader;
+      this.objectWriter = objectWriter;
+    }
+
+    public <T> InstanceSerializer<T> getInstanceSerializer(
+        TypeReference<ServiceInstance<T>> typeReference)
+    {
+      return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference);
+    }
+
+    final class JacksonInstanceSerializer<T> implements InstanceSerializer<T>
+    {
+      private final TypeReference<ServiceInstance<T>> typeRef;
+      private final ObjectWriter objectWriter;
+      private final ObjectReader objectReader;
+
+      JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter,
+          TypeReference<ServiceInstance<T>> typeRef)
+      {
+        this.objectReader = objectReader;
+        this.objectWriter = objectWriter;
+        this.typeRef = typeRef;
+      }
+
+      @Override
+      public ServiceInstance<T> deserialize(byte[] bytes) throws Exception
+      {
+        return objectReader.withType(typeRef).readValue(bytes);
+      }
+
+      @Override
+      public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception
+      {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        objectWriter.writeValue(out, serviceInstance);
+        return out.toByteArray();
+      }
+
+    }
+
+  }
+
+  private static final long serialVersionUID = 201401221145L;
+  private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class);
+}