You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/30 22:28:26 UTC

[1/7] incubator-apex-malhar git commit: Rename HDS to HDHT.

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 e24c14c3e -> 333a70733


Rename HDS to HDHT.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4e47d236
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4e47d236
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4e47d236

Branch: refs/heads/devel-3
Commit: 4e47d236cbd8a1ca3c6dd96235cd588a7eb1d2e6
Parents: 217f8db
Author: thomas <th...@datatorrent.com>
Authored: Mon Dec 8 15:22:57 2014 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:01 2015 -0800

----------------------------------------------------------------------
 AbstractSinglePortHDSWriter.java | 194 ++++++++++++++++++++++++++++++++++
 HDHTFileAccess.java              | 122 +++++++++++++++++++++
 HDHTFileAccessFSImpl.java        | 125 ++++++++++++++++++++++
 tfile/DTFileReader.java          | 110 +++++++++++++++++++
 tfile/TFileImpl.java             | 176 ++++++++++++++++++++++++++++++
 tfile/TFileReader.java           | 110 +++++++++++++++++++
 tfile/TFileWriter.java           |  55 ++++++++++
 7 files changed, 892 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/AbstractSinglePortHDSWriter.java
----------------------------------------------------------------------
diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java
new file mode 100644
index 0000000..04fa602
--- /dev/null
+++ b/AbstractSinglePortHDSWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.common.util.Slice;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+/**
+ * Operator that receives data on port and writes it to the data store.
+ * Implements partitioning, maps partition key to the store bucket.
+ * The derived class supplies the codec for partitioning and key-value serialization.
+ * @param <EVENT>
+ */
+public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements Partitioner<AbstractSinglePortHDSWriter<EVENT>>
+{
+  public interface HDSCodec<EVENT> extends StreamCodec<EVENT>
+  {
+    byte[] getKeyBytes(EVENT event);
+    byte[] getValueBytes(EVENT event);
+    EVENT fromKeyValue(Slice key, byte[] value);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class);
+
+  protected int partitionMask;
+
+  protected Set<Integer> partitions;
+
+  protected transient HDSCodec<EVENT> codec;
+
+  @Min(1)
+  private int partitionCount = 1;
+
+  public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>()
+  {
+    @Override
+    public void process(EVENT event)
+    {
+      try {
+        processEvent(event);
+      } catch (IOException e) {
+        throw new RuntimeException("Error processing " + event, e);
+      }
+    }
+
+    @Override
+    public StreamCodec<EVENT> getStreamCodec()
+    {
+      return getCodec();
+    }
+  };
+
+  public void setPartitionCount(int partitionCount)
+  {
+    this.partitionCount = partitionCount;
+  }
+
+  public int getPartitionCount()
+  {
+    return partitionCount;
+  }
+
+  /**
+   * Storage bucket for the given event. Only one partition can write to a storage bucket and by default it is
+   * identified by the partition id.
+   *
+   * @param event
+   * @return The bucket key.
+   */
+  protected long getBucketKey(EVENT event)
+  {
+    return (codec.getPartition(event) & partitionMask);
+  }
+
+  protected void processEvent(EVENT event) throws IOException
+  {
+    byte[] key = codec.getKeyBytes(event);
+    byte[] value = codec.getValueBytes(event);
+    super.put(getBucketKey(event), new Slice(key), value);
+  }
+
+  abstract protected HDSCodec<EVENT> getCodec();
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask, this.partitions));
+    super.setup(arg0);
+    try {
+      this.codec = getCodec();
+      // inject the operator reference, if such field exists
+      // TODO: replace with broader solution
+      Class<?> cls = this.codec.getClass();
+      while (cls != null) {
+        for (Field field : cls.getDeclaredFields()) {
+          if (field.getType().isAssignableFrom(this.getClass())) {
+            field.setAccessible(true);
+            field.set(this.codec, this);
+          }
+        }
+        cls = cls.getSuperclass();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create codec", e);
+    }
+  }
+
+  @Override
+  public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> partitions, int incrementalCapacity)
+  {
+    boolean isInitialPartition = partitions.iterator().next().getStats() == null;
+
+    if (!isInitialPartition) {
+      // support for dynamic partitioning requires lineage tracking
+      LOG.warn("Dynamic partitioning not implemented");
+      return partitions;
+    }
+
+    int totalCount;
+
+    //Get the size of the partition for parallel partitioning
+    if(incrementalCapacity != 0) {
+      totalCount = incrementalCapacity;
+    }
+    //Do normal partitioning
+    else {
+      totalCount = partitionCount;
+    }
+
+    Kryo lKryo = new Kryo();
+    Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
+    for (int i = 0; i < totalCount; i++) {
+      // Kryo.copy fails as it attempts to clone transient fields (input port)
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      lKryo.writeObject(output, this);
+      output.close();
+      Input lInput = new Input(bos.toByteArray());
+      @SuppressWarnings("unchecked")
+      AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass());
+      newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper));
+    }
+
+    // assign the partition keys
+    DefaultPartition.assignPartitionKeys(newPartitions, input);
+
+    for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) {
+      PartitionKeys pks = p.getPartitionKeys().get(input);
+      p.getPartitionedInstance().partitionMask = pks.mask;
+      p.getPartitionedInstance().partitions = pks.partitions;
+    }
+
+    return newPartitions;
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>> arg0)
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
new file mode 100644
index 0000000..fc3d56f
--- /dev/null
+++ b/HDHTFileAccess.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.common.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ */
+public interface HDHTFileAccess extends Closeable
+{
+  void init();
+
+  DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+  DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Atomic file rename.
+   * @param bucketKey
+   * @param oldName
+   * @param newName
+   * @throws IOException
+   */
+  void rename(long bucketKey, String oldName, String newName) throws IOException;
+  void delete(long bucketKey, String fileName) throws IOException;
+
+  long getFileSize(long bucketKey, String s) throws IOException;
+
+  /**
+   * HDHT Data File Format Reader
+   */
+  interface HDSFileReader extends Closeable
+  {
+    /**
+     * Read the entire contents of the underlying file into a TreeMap structure
+     * @param data
+     * @throws IOException
+     */
+    //Move to
+    // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+    void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+    /**
+     * Repositions the pointer to the beginning of the underlying file.
+     * @throws IOException
+     */
+    void reset() throws IOException;
+
+    /**
+     * Searches for a matching key, and positions the pointer before the start of the key.
+     * @param key Byte array representing the key
+     * @throws IOException
+     * @return true if a given key is found
+     */
+    boolean seek(Slice key) throws IOException;
+
+    /**
+     * Reads next available key/value pair starting from the current pointer position
+     * into Slice objects and advances pointer to next key.  If pointer is at the end
+     * of the file, false is returned, and Slice objects remains unmodified.
+     *
+     * @param key Empty slice object
+     * @param value Empty slice object
+     * @return True if key/value were successfully read, false otherwise
+     * @throws IOException
+     */
+    boolean next(Slice key, Slice value) throws IOException;
+
+  }
+
+  /**
+   * HDHT Data File Format Writer
+   */
+  interface HDSFileWriter extends Closeable {
+    /**
+     * Appends key/value pair to the underlying file.
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    void append(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Returns number of bytes written to the underlying stream.
+     * @return The bytes written.
+     * @throws IOException
+     */
+    long getBytesWritten() throws IOException;
+  }
+
+  /**
+   * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
+   * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
+   */
+  public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
+   * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
+   */
+  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
new file mode 100644
index 0000000..ad9aa05
--- /dev/null
+++ b/HDHTFileAccessFSImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ */
+abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
+{
+  @NotNull
+  private String basePath;
+  protected transient FileSystem fs;
+
+  public HDHTFileAccessFSImpl()
+  {
+  }
+
+  public String getBasePath()
+  {
+    return basePath;
+  }
+
+  public void setBasePath(String path)
+  {
+    this.basePath = path;
+  }
+
+  protected Path getFilePath(long bucketKey, String fileName) {
+    return new Path(getBucketPath(bucketKey), fileName);
+  }
+
+  protected Path getBucketPath(long bucketKey)
+  {
+    return new Path(basePath, Long.toString(bucketKey));
+  }
+
+  @Override
+  public long getFileSize(long bucketKey, String fileName) throws IOException {
+    return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    fs.close();
+  }
+
+  @Override
+  public void init()
+  {
+    if (fs == null) {
+      Path dataFilePath = new Path(basePath);
+      try {
+        fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void delete(long bucketKey, String fileName) throws IOException
+  {
+    fs.delete(getFilePath(bucketKey, fileName), true);
+  }
+
+  @Override
+  public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+  {
+    Path path = getFilePath(bucketKey, fileName);
+    return fs.create(path, true);
+  }
+
+  @Override
+  public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+  {
+    return fs.open(getFilePath(bucketKey, fileName));
+  }
+
+  @Override
+  public void rename(long bucketKey, String fromName, String toName) throws IOException
+  {
+    FileContext fc = FileContext.getFileContext(fs.getUri());
+    Path bucketPath = getBucketPath(bucketKey);
+    // file context requires absolute path
+    if (!bucketPath.isAbsolute()) {
+      bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+    }
+    fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+  }
+
+  @Override
+  public String toString()
+  {
+    return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
new file mode 100644
index 0000000..fefadaf
--- /dev/null
+++ b/tfile/DTFileReader.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.common.util.Slice;
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
+ *
+ *
+ */
+public class DTFileReader implements HDSFileReader
+{
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+
+  public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+      byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
+      data.put(key, value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    return scanner.seekTo(key.buffer, key.offset, key.length);
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+
+    key.buffer = en.getBlockBuffer();
+    key.offset = en.getKeyOffset();
+    key.length = en.getKeyLength();
+
+    value.buffer = en.getBlockBuffer();
+    value.offset = en.getValueOffset();
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
new file mode 100644
index 0000000..714a5b1
--- /dev/null
+++ b/tfile/TFileImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
+
+/**
+ * A TFile wrapper with HDHTFileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> 
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> 
+ * </ul>
+ *
+ */
+public abstract class TFileImpl extends HDHTFileAccessFSImpl
+{
+  private int minBlockSize = 64 * 1024;
+
+  private String compressName = TFile.COMPRESSION_NONE;
+  
+  private String comparator = "memcmp";
+  
+  private int chunkSize = 1024 * 1024;
+  
+  private int inputBufferSize = 256 * 1024;
+  
+  private int outputBufferSize = 256 * 1024;
+
+  
+  private void setupConfig(Configuration conf)
+  {
+    conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+    conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+    conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+  }
+
+
+  @Override
+  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
+  {
+    FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+    setupConfig(fs.getConf());
+    return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+  }
+  
+  public int getMinBlockSize()
+  {
+    return minBlockSize;
+  }
+
+
+  public void setMinBlockSize(int minBlockSize)
+  {
+    this.minBlockSize = minBlockSize;
+  }
+
+
+  public String getCompressName()
+  {
+    return compressName;
+  }
+
+
+  public void setCompressName(String compressName)
+  {
+    this.compressName = compressName;
+  }
+
+
+  public String getComparator()
+  {
+    return comparator;
+  }
+
+
+  public void setComparator(String comparator)
+  {
+    this.comparator = comparator;
+  }
+
+
+  public int getChunkSize()
+  {
+    return chunkSize;
+  }
+
+
+  public void setChunkSize(int chunkSize)
+  {
+    this.chunkSize = chunkSize;
+  }
+
+
+  public int getInputBufferSize()
+  {
+    return inputBufferSize;
+  }
+
+
+  public void setInputBufferSize(int inputBufferSize)
+  {
+    this.inputBufferSize = inputBufferSize;
+  }
+
+
+  public int getOutputBufferSize()
+  {
+    return outputBufferSize;
+  }
+
+
+  public void setOutputBufferSize(int outputBufferSize)
+  {
+    this.outputBufferSize = outputBufferSize;
+  }
+  
+  /**
+   * Return {@link TFile} {@link Reader}
+   *
+   */
+  public static class DefaultTFileImpl extends TFileImpl{
+    
+    @Override
+    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new TFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+  
+  
+  /**
+   * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+   *
+   */
+  public static class DTFileImpl extends TFileImpl {
+    
+    @Override
+    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new DTFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
new file mode 100644
index 0000000..d20408c
--- /dev/null
+++ b/tfile/TFileReader.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.common.util.Slice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+
+public class TFileReader implements HDSFileReader
+{
+
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+
+  public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      int klen = en.getKeyLength();
+      int vlen = en.getValueLength();
+      byte[] key = new byte[klen];
+      byte[] value = new byte[vlen];
+      en.getKey(key);
+      en.getValue(value);
+      data.put(new Slice(key, 0, key.length), value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    return scanner.seekTo(key.buffer, key.offset, key.length);
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+    byte[] rkey = new byte[en.getKeyLength()];
+    byte[] rval = new byte[en.getValueLength()];
+    en.getKey(rkey);
+    en.getValue(rval);
+
+    key.buffer = rkey;
+    key.offset = 0;
+    key.length = en.getKeyLength();
+
+    value.buffer = rval;
+    value.offset = 0;
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
new file mode 100644
index 0000000..b6fd90d
--- /dev/null
+++ b/tfile/TFileWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
+
+public final class TFileWriter implements HDSFileWriter
+{
+  private Writer writer;
+  
+  private FSDataOutputStream fsdos;
+  
+  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
+  {
+    this.fsdos = stream;
+    writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+    
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    writer.close();
+    fsdos.close();
+  }
+
+  @Override
+  public void append(byte[] key, byte[] value) throws IOException
+  {
+    writer.append(key, value);
+  }
+
+  @Override
+  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}


[4/7] incubator-apex-malhar git commit: MLHR-1721: Convert NullPointerException to IOException during seek to avoid operator failure.

Posted by hs...@apache.org.
MLHR-1721: Convert NullPointerException to IOException during seek
to avoid operator failure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/bd9cd5c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/bd9cd5c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/bd9cd5c9

Branch: refs/heads/devel-3
Commit: bd9cd5c932e44c401b8a334e0d4e7736c2980ea7
Parents: 818c683
Author: Tushar Gosavi <tu...@datatorrent.com>
Authored: Thu Apr 30 11:11:41 2015 +0530
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800

----------------------------------------------------------------------
 tfile/TFileReader.java | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bd9cd5c9/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
index 972b4f9..2f06da9 100644
--- a/tfile/TFileReader.java
+++ b/tfile/TFileReader.java
@@ -18,6 +18,7 @@ package com.datatorrent.contrib.hdht.tfile;
 import java.io.IOException;
 import java.util.TreeMap;
 
+import com.datatorrent.common.util.DTThrowable;
 import com.datatorrent.common.util.Slice;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ public class TFileReader implements HDSFileReader
   private final Reader reader;
   private final Scanner scanner;
   private final FSDataInputStream fsdis;
+  private boolean closed = false;
 
   public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
   {
@@ -54,6 +56,7 @@ public class TFileReader implements HDSFileReader
   @Override
   public void close() throws IOException
   {
+    closed = true;
     scanner.close();
     reader.close();
     fsdis.close();
@@ -85,7 +88,14 @@ public class TFileReader implements HDSFileReader
   @Override
   public boolean seek(Slice key) throws IOException
   {
-    return scanner.seekTo(key.buffer, key.offset, key.length);
+    try {
+      return scanner.seekTo(key.buffer, key.offset, key.length);
+    } catch (NullPointerException ex) {
+      if (closed)
+        throw new IOException("Stream was closed");
+      else
+        throw ex;
+    }
   }
 
   @Override


[5/7] incubator-apex-malhar git commit: Adding missing @since tags for 2.0.0 release

Posted by hs...@apache.org.
Adding missing @since tags for 2.0.0 release


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/818c683e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/818c683e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/818c683e

Branch: refs/heads/devel-3
Commit: 818c683e051233e92f5b1d3263d6f8e222d3d9b9
Parents: 1576ce7
Author: sashadt <sa...@datatorrent.com>
Authored: Fri Jan 30 18:44:47 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800

----------------------------------------------------------------------
 HDHTFileAccess.java       | 2 ++
 HDHTFileAccessFSImpl.java | 2 ++
 tfile/DTFileReader.java   | 1 +
 tfile/TFileImpl.java      | 1 +
 tfile/TFileReader.java    | 5 +++++
 tfile/TFileWriter.java    | 5 +++++
 6 files changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
index fc3d56f..a26c1a7 100644
--- a/HDHTFileAccess.java
+++ b/HDHTFileAccess.java
@@ -25,6 +25,8 @@ import com.datatorrent.common.util.Slice;
 
 /**
  * Abstraction for file system and format interaction.
+ *
+ * @since 2.0.0
  */
 public interface HDHTFileAccess extends Closeable
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
index ad9aa05..5c9cbfa 100644
--- a/HDHTFileAccessFSImpl.java
+++ b/HDHTFileAccessFSImpl.java
@@ -31,6 +31,8 @@ import com.datatorrent.common.util.DTThrowable;
 
 /**
  * Hadoop file system backed store.
+ *
+ * @since 2.0.0
  */
 abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
index fefadaf..a164b98 100644
--- a/tfile/DTFileReader.java
+++ b/tfile/DTFileReader.java
@@ -39,6 +39,7 @@ import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
  * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
  *
  *
+ * @since 2.0.0
  */
 public class DTFileReader implements HDSFileReader
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
index 714a5b1..5dc9464 100644
--- a/tfile/TFileImpl.java
+++ b/tfile/TFileImpl.java
@@ -34,6 +34,7 @@ import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
  * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> 
  * </ul>
  *
+ * @since 2.0.0
  */
 public abstract class TFileImpl extends HDHTFileAccessFSImpl
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
index d20408c..972b4f9 100644
--- a/tfile/TFileReader.java
+++ b/tfile/TFileReader.java
@@ -28,6 +28,11 @@ import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
 
 import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
 
+/**
+ * TFileReader
+ *
+ * @since 2.0.0
+ */
 public class TFileReader implements HDSFileReader
 {
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
index b6fd90d..549e1b8 100644
--- a/tfile/TFileWriter.java
+++ b/tfile/TFileWriter.java
@@ -23,6 +23,11 @@ import org.apache.hadoop.io.file.tfile.TFile.Writer;
 
 import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
 
+/**
+ * TFileWriter
+ *
+ * @since 2.0.0
+ */
 public final class TFileWriter implements HDSFileWriter
 {
   private Writer writer;


[3/7] incubator-apex-malhar git commit: HDHT rename.

Posted by hs...@apache.org.
HDHT rename.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1576ce7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1576ce7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1576ce7d

Branch: refs/heads/devel-3
Commit: 1576ce7d5366f1a0b79b72511956604092260c00
Parents: 4e47d23
Author: thomas <th...@datatorrent.com>
Authored: Fri Dec 19 17:43:21 2014 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800

----------------------------------------------------------------------
 AbstractSinglePortHDSWriter.java | 194 ----------------------------------
 1 file changed, 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1576ce7d/AbstractSinglePortHDSWriter.java
----------------------------------------------------------------------
diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java
deleted file mode 100644
index 04fa602..0000000
--- a/AbstractSinglePortHDSWriter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.Min;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.common.util.Slice;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-
-/**
- * Operator that receives data on port and writes it to the data store.
- * Implements partitioning, maps partition key to the store bucket.
- * The derived class supplies the codec for partitioning and key-value serialization.
- * @param <EVENT>
- */
-public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements Partitioner<AbstractSinglePortHDSWriter<EVENT>>
-{
-  public interface HDSCodec<EVENT> extends StreamCodec<EVENT>
-  {
-    byte[] getKeyBytes(EVENT event);
-    byte[] getValueBytes(EVENT event);
-    EVENT fromKeyValue(Slice key, byte[] value);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class);
-
-  protected int partitionMask;
-
-  protected Set<Integer> partitions;
-
-  protected transient HDSCodec<EVENT> codec;
-
-  @Min(1)
-  private int partitionCount = 1;
-
-  public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>()
-  {
-    @Override
-    public void process(EVENT event)
-    {
-      try {
-        processEvent(event);
-      } catch (IOException e) {
-        throw new RuntimeException("Error processing " + event, e);
-      }
-    }
-
-    @Override
-    public StreamCodec<EVENT> getStreamCodec()
-    {
-      return getCodec();
-    }
-  };
-
-  public void setPartitionCount(int partitionCount)
-  {
-    this.partitionCount = partitionCount;
-  }
-
-  public int getPartitionCount()
-  {
-    return partitionCount;
-  }
-
-  /**
-   * Storage bucket for the given event. Only one partition can write to a storage bucket and by default it is
-   * identified by the partition id.
-   *
-   * @param event
-   * @return The bucket key.
-   */
-  protected long getBucketKey(EVENT event)
-  {
-    return (codec.getPartition(event) & partitionMask);
-  }
-
-  protected void processEvent(EVENT event) throws IOException
-  {
-    byte[] key = codec.getKeyBytes(event);
-    byte[] value = codec.getValueBytes(event);
-    super.put(getBucketKey(event), new Slice(key), value);
-  }
-
-  abstract protected HDSCodec<EVENT> getCodec();
-
-  @Override
-  public void setup(OperatorContext arg0)
-  {
-    LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask, this.partitions));
-    super.setup(arg0);
-    try {
-      this.codec = getCodec();
-      // inject the operator reference, if such field exists
-      // TODO: replace with broader solution
-      Class<?> cls = this.codec.getClass();
-      while (cls != null) {
-        for (Field field : cls.getDeclaredFields()) {
-          if (field.getType().isAssignableFrom(this.getClass())) {
-            field.setAccessible(true);
-            field.set(this.codec, this);
-          }
-        }
-        cls = cls.getSuperclass();
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to create codec", e);
-    }
-  }
-
-  @Override
-  public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> partitions, int incrementalCapacity)
-  {
-    boolean isInitialPartition = partitions.iterator().next().getStats() == null;
-
-    if (!isInitialPartition) {
-      // support for dynamic partitioning requires lineage tracking
-      LOG.warn("Dynamic partitioning not implemented");
-      return partitions;
-    }
-
-    int totalCount;
-
-    //Get the size of the partition for parallel partitioning
-    if(incrementalCapacity != 0) {
-      totalCount = incrementalCapacity;
-    }
-    //Do normal partitioning
-    else {
-      totalCount = partitionCount;
-    }
-
-    Kryo lKryo = new Kryo();
-    Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
-    for (int i = 0; i < totalCount; i++) {
-      // Kryo.copy fails as it attempts to clone transient fields (input port)
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Output output = new Output(bos);
-      lKryo.writeObject(output, this);
-      output.close();
-      Input lInput = new Input(bos.toByteArray());
-      @SuppressWarnings("unchecked")
-      AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass());
-      newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper));
-    }
-
-    // assign the partition keys
-    DefaultPartition.assignPartitionKeys(newPartitions, input);
-
-    for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) {
-      PartitionKeys pks = p.getPartitionKeys().get(input);
-      p.getPartitionedInstance().partitionMask = pks.mask;
-      p.getPartitionedInstance().partitions = pks.partitions;
-    }
-
-    return newPartitions;
-  }
-
-  @Override
-  public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>> arg0)
-  {
-  }
-
-}


[2/7] incubator-apex-malhar git commit: make the code to compile with open source Apex

Posted by hs...@apache.org.
make the code to compile with open source Apex


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c7874616
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c7874616
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c7874616

Branch: refs/heads/devel-3
Commit: c7874616a300f84aac44af62eac4b0b534e547aa
Parents: bd9cd5c
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Jun 22 17:07:41 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800

----------------------------------------------------------------------
 HDHTFileAccess.java       | 2 +-
 HDHTFileAccessFSImpl.java | 2 +-
 tfile/DTFileReader.java   | 2 +-
 tfile/TFileReader.java    | 4 ++--
 4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
index a26c1a7..266ba75 100644
--- a/HDHTFileAccess.java
+++ b/HDHTFileAccess.java
@@ -21,7 +21,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.TreeMap;
 
-import com.datatorrent.common.util.Slice;
+import com.datatorrent.netlet.util.Slice;
 
 /**
  * Abstraction for file system and format interaction.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
index 5c9cbfa..13dd0ad 100644
--- a/HDHTFileAccessFSImpl.java
+++ b/HDHTFileAccessFSImpl.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 
-import com.datatorrent.common.util.DTThrowable;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Hadoop file system backed store.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
index a164b98..e61d475 100644
--- a/tfile/DTFileReader.java
+++ b/tfile/DTFileReader.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
 import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
 import org.apache.hadoop.io.file.tfile.TFile;
 
-import com.datatorrent.common.util.Slice;
+import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
index 2f06da9..0994666 100644
--- a/tfile/TFileReader.java
+++ b/tfile/TFileReader.java
@@ -18,8 +18,8 @@ package com.datatorrent.contrib.hdht.tfile;
 import java.io.IOException;
 import java.util.TreeMap;
 
-import com.datatorrent.common.util.DTThrowable;
-import com.datatorrent.common.util.Slice;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.netlet.util.Slice;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;


[6/7] incubator-apex-malhar git commit: MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations

Posted by hs...@apache.org.
MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7d2f4749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7d2f4749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7d2f4749

Branch: refs/heads/devel-3
Commit: 7d2f47491498c6b1c550f70e626dd76ba1db393e
Parents: c787461
Author: MalharJenkins <je...@datatorrent.com>
Authored: Mon Nov 23 21:14:41 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 22:11:18 2015 -0800

----------------------------------------------------------------------
 HDHTFileAccess.java                             | 124 -------------
 HDHTFileAccessFSImpl.java                       | 127 -------------
 .../lib/fileaccess/DTFileReader.java            | 112 ++++++++++++
 .../datatorrent/lib/fileaccess/FileAccess.java  | 129 ++++++++++++++
 .../lib/fileaccess/FileAccessFSImpl.java        | 130 ++++++++++++++
 .../datatorrent/lib/fileaccess/TFileImpl.java   | 178 +++++++++++++++++++
 .../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++
 .../datatorrent/lib/fileaccess/TFileWriter.java |  61 +++++++
 pom.xml                                         |   2 +-
 tfile/DTFileReader.java                         | 111 ------------
 tfile/TFileImpl.java                            | 177 ------------------
 tfile/TFileReader.java                          | 125 -------------
 tfile/TFileWriter.java                          |  60 -------
 13 files changed, 736 insertions(+), 725 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
deleted file mode 100644
index 266ba75..0000000
--- a/HDHTFileAccess.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Abstraction for file system and format interaction.
- *
- * @since 2.0.0
- */
-public interface HDHTFileAccess extends Closeable
-{
-  void init();
-
-  DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
-  DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
-
-  /**
-   * Atomic file rename.
-   * @param bucketKey
-   * @param oldName
-   * @param newName
-   * @throws IOException
-   */
-  void rename(long bucketKey, String oldName, String newName) throws IOException;
-  void delete(long bucketKey, String fileName) throws IOException;
-
-  long getFileSize(long bucketKey, String s) throws IOException;
-
-  /**
-   * HDHT Data File Format Reader
-   */
-  interface HDSFileReader extends Closeable
-  {
-    /**
-     * Read the entire contents of the underlying file into a TreeMap structure
-     * @param data
-     * @throws IOException
-     */
-    //Move to
-    // void readFully(TreeMap<Slice, Slice> data) throws IOException;
-    void readFully(TreeMap<Slice, byte[]> data) throws IOException;
-
-    /**
-     * Repositions the pointer to the beginning of the underlying file.
-     * @throws IOException
-     */
-    void reset() throws IOException;
-
-    /**
-     * Searches for a matching key, and positions the pointer before the start of the key.
-     * @param key Byte array representing the key
-     * @throws IOException
-     * @return true if a given key is found
-     */
-    boolean seek(Slice key) throws IOException;
-
-    /**
-     * Reads next available key/value pair starting from the current pointer position
-     * into Slice objects and advances pointer to next key.  If pointer is at the end
-     * of the file, false is returned, and Slice objects remains unmodified.
-     *
-     * @param key Empty slice object
-     * @param value Empty slice object
-     * @return True if key/value were successfully read, false otherwise
-     * @throws IOException
-     */
-    boolean next(Slice key, Slice value) throws IOException;
-
-  }
-
-  /**
-   * HDHT Data File Format Writer
-   */
-  interface HDSFileWriter extends Closeable {
-    /**
-     * Appends key/value pair to the underlying file.
-     * @param key
-     * @param value
-     * @throws IOException
-     */
-    void append(byte[] key, byte[] value) throws IOException;
-
-    /**
-     * Returns number of bytes written to the underlying stream.
-     * @return The bytes written.
-     * @throws IOException
-     */
-    long getBytesWritten() throws IOException;
-  }
-
-  /**
-   * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
-   * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
-   */
-  public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
-
-  /**
-   * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
-   * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
-   */
-  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
deleted file mode 100644
index 13dd0ad..0000000
--- a/HDHTFileAccessFSImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.IOException;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Hadoop file system backed store.
- *
- * @since 2.0.0
- */
-abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
-{
-  @NotNull
-  private String basePath;
-  protected transient FileSystem fs;
-
-  public HDHTFileAccessFSImpl()
-  {
-  }
-
-  public String getBasePath()
-  {
-    return basePath;
-  }
-
-  public void setBasePath(String path)
-  {
-    this.basePath = path;
-  }
-
-  protected Path getFilePath(long bucketKey, String fileName) {
-    return new Path(getBucketPath(bucketKey), fileName);
-  }
-
-  protected Path getBucketPath(long bucketKey)
-  {
-    return new Path(basePath, Long.toString(bucketKey));
-  }
-
-  @Override
-  public long getFileSize(long bucketKey, String fileName) throws IOException {
-    return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
-  }
-
-  @Override
-  public void close() throws IOException
-  {
-    fs.close();
-  }
-
-  @Override
-  public void init()
-  {
-    if (fs == null) {
-      Path dataFilePath = new Path(basePath);
-      try {
-        fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
-      } catch (IOException e) {
-        DTThrowable.rethrow(e);
-      }
-    }
-  }
-
-  @Override
-  public void delete(long bucketKey, String fileName) throws IOException
-  {
-    fs.delete(getFilePath(bucketKey, fileName), true);
-  }
-
-  @Override
-  public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
-  {
-    Path path = getFilePath(bucketKey, fileName);
-    return fs.create(path, true);
-  }
-
-  @Override
-  public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
-  {
-    return fs.open(getFilePath(bucketKey, fileName));
-  }
-
-  @Override
-  public void rename(long bucketKey, String fromName, String toName) throws IOException
-  {
-    FileContext fc = FileContext.getFileContext(fs.getUri());
-    Path bucketPath = getBucketPath(bucketKey);
-    // file context requires absolute path
-    if (!bucketPath.isAbsolute()) {
-      bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
-    }
-    fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
-  }
-
-  @Override
-  public String toString()
-  {
-    return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
new file mode 100644
index 0000000..cb97520
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
+ *
+ *
+ * @since 2.0.0
+ */
+public class DTFileReader implements FileAccess.FileReader
+{
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+
+  public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+      byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
+      data.put(key, value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    return scanner.seekTo(key.buffer, key.offset, key.length);
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+
+    key.buffer = en.getBlockBuffer();
+    key.offset = en.getKeyOffset();
+    key.length = en.getKeyLength();
+
+    value.buffer = en.getBlockBuffer();
+    value.offset = en.getValueOffset();
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
new file mode 100644
index 0000000..4b7f6e5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ *
+ * @since 2.0.0
+ */
+public interface FileAccess extends Closeable
+{
+  void init();
+
+  DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+
+  DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Atomic file rename.
+   * @param bucketKey
+   * @param oldName
+   * @param newName
+   * @throws IOException
+   */
+  void rename(long bucketKey, String oldName, String newName) throws IOException;
+  void delete(long bucketKey, String fileName) throws IOException;
+
+  long getFileSize(long bucketKey, String s) throws IOException;
+
+  /**
+   * Data File Format Reader
+   */
+  interface FileReader extends Closeable
+  {
+    /**
+     * Read the entire contents of the underlying file into a TreeMap structure
+     * @param data
+     * @throws IOException
+     */
+    //Move to
+    // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+    void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+    /**
+     * Repositions the pointer to the beginning of the underlying file.
+     * @throws IOException
+     */
+    void reset() throws IOException;
+
+    /**
+     * Searches for a matching key, and positions the pointer before the start of the key.
+     * @param key Byte array representing the key
+     * @throws IOException
+     * @return true if a given key is found
+     */
+    boolean seek(Slice key) throws IOException;
+
+    /**
+     * Reads next available key/value pair starting from the current pointer position
+     * into Slice objects and advances pointer to next key.  If pointer is at the end
+     * of the file, false is returned, and Slice objects remains unmodified.
+     *
+     * @param key Empty slice object
+     * @param value Empty slice object
+     * @return True if key/value were successfully read, false otherwise
+     * @throws IOException
+     */
+    boolean next(Slice key, Slice value) throws IOException;
+
+  }
+
+  /**
+   * Data File Format Writer
+   */
+  interface FileWriter extends Closeable
+  {
+    /**
+     * Appends key/value pair to the underlying file.
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    void append(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Returns number of bytes written to the underlying stream.
+     * @return The bytes written.
+     * @throws IOException
+     */
+    long getBytesWritten() throws IOException;
+  }
+
+  /**
+   * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
+   * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
+   */
+  public FileReader getReader(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
+   * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
+   */
+  public FileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
new file mode 100644
index 0000000..80a201a
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ *
+ * @since 2.0.0
+ */
+public abstract class FileAccessFSImpl implements FileAccess
+{
+  @NotNull
+  private String basePath;
+  protected transient FileSystem fs;
+
+  public FileAccessFSImpl()
+  {
+  }
+
+  public String getBasePath()
+  {
+    return basePath;
+  }
+
+  public void setBasePath(String path)
+  {
+    this.basePath = path;
+  }
+
+  protected Path getFilePath(long bucketKey, String fileName) {
+    return new Path(getBucketPath(bucketKey), fileName);
+  }
+
+  protected Path getBucketPath(long bucketKey)
+  {
+    return new Path(basePath, Long.toString(bucketKey));
+  }
+
+  @Override
+  public long getFileSize(long bucketKey, String fileName) throws IOException {
+    return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    fs.close();
+  }
+
+  @Override
+  public void init()
+  {
+    if (fs == null) {
+      Path dataFilePath = new Path(basePath);
+      try {
+        fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void delete(long bucketKey, String fileName) throws IOException
+  {
+    fs.delete(getFilePath(bucketKey, fileName), true);
+  }
+
+  @Override
+  public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+  {
+    Path path = getFilePath(bucketKey, fileName);
+    return fs.create(path, true);
+  }
+
+  @Override
+  public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+  {
+    return fs.open(getFilePath(bucketKey, fileName));
+  }
+
+  @Override
+  public void rename(long bucketKey, String fromName, String toName) throws IOException
+  {
+    FileContext fc = FileContext.getFileContext(fs.getUri());
+    Path bucketPath = getBucketPath(bucketKey);
+    // file context requires absolute path
+    if (!bucketPath.isAbsolute()) {
+      bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+    }
+    fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+  }
+
+  @Override
+  public String toString()
+  {
+    return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
new file mode 100644
index 0000000..5526832
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * A TFile wrapper with FileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> 
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> 
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+public abstract class TFileImpl extends FileAccessFSImpl
+{
+  private int minBlockSize = 64 * 1024;
+
+  private String compressName = TFile.COMPRESSION_NONE;
+  
+  private String comparator = "memcmp";
+  
+  private int chunkSize = 1024 * 1024;
+  
+  private int inputBufferSize = 256 * 1024;
+  
+  private int outputBufferSize = 256 * 1024;
+
+  
+  private void setupConfig(Configuration conf)
+  {
+    conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+    conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+    conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+  }
+
+
+  @Override
+  public FileWriter getWriter(long bucketKey, String fileName) throws IOException
+  {
+    FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+    setupConfig(fs.getConf());
+    return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+  }
+  
+  public int getMinBlockSize()
+  {
+    return minBlockSize;
+  }
+
+
+  public void setMinBlockSize(int minBlockSize)
+  {
+    this.minBlockSize = minBlockSize;
+  }
+
+
+  public String getCompressName()
+  {
+    return compressName;
+  }
+
+
+  public void setCompressName(String compressName)
+  {
+    this.compressName = compressName;
+  }
+
+
+  public String getComparator()
+  {
+    return comparator;
+  }
+
+
+  public void setComparator(String comparator)
+  {
+    this.comparator = comparator;
+  }
+
+
+  public int getChunkSize()
+  {
+    return chunkSize;
+  }
+
+
+  public void setChunkSize(int chunkSize)
+  {
+    this.chunkSize = chunkSize;
+  }
+
+
+  public int getInputBufferSize()
+  {
+    return inputBufferSize;
+  }
+
+
+  public void setInputBufferSize(int inputBufferSize)
+  {
+    this.inputBufferSize = inputBufferSize;
+  }
+
+
+  public int getOutputBufferSize()
+  {
+    return outputBufferSize;
+  }
+
+
+  public void setOutputBufferSize(int outputBufferSize)
+  {
+    this.outputBufferSize = outputBufferSize;
+  }
+  
+  /**
+   * Return {@link TFile} {@link Reader}
+   *
+   */
+  public static class DefaultTFileImpl extends TFileImpl{
+    
+    @Override
+    public FileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new TFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+  
+  
+  /**
+   * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+   *
+   */
+  public static class DTFileImpl extends TFileImpl {
+    
+    @Override
+    public FileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new DTFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
new file mode 100644
index 0000000..8426c3f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * TFileReader
+ *
+ * @since 2.0.0
+ */
+public class TFileReader implements FileAccess.FileReader
+{
+
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+  private boolean closed = false;
+
+  public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    closed = true;
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      int klen = en.getKeyLength();
+      int vlen = en.getValueLength();
+      byte[] key = new byte[klen];
+      byte[] value = new byte[vlen];
+      en.getKey(key);
+      en.getValue(value);
+      data.put(new Slice(key, 0, key.length), value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    try {
+      return scanner.seekTo(key.buffer, key.offset, key.length);
+    } catch (NullPointerException ex) {
+      if (closed)
+        throw new IOException("Stream was closed");
+      else
+        throw ex;
+    }
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+    byte[] rkey = new byte[en.getKeyLength()];
+    byte[] rval = new byte[en.getValueLength()];
+    en.getKey(rkey);
+    en.getValue(rval);
+
+    key.buffer = rkey;
+    key.offset = 0;
+    key.length = en.getKeyLength();
+
+    value.buffer = rval;
+    value.offset = 0;
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
new file mode 100644
index 0000000..b362987
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * TFileWriter
+ *
+ * @since 2.0.0
+ */
+public final class TFileWriter implements FileAccess.FileWriter
+{
+  private Writer writer;
+  
+  private FSDataOutputStream fsdos;
+  
+  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
+  {
+    this.fsdos = stream;
+    writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+    
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    writer.close();
+    fsdos.close();
+  }
+
+  @Override
+  public void append(byte[] key, byte[] value) throws IOException
+  {
+    writer.append(key, value);
+  }
+
+  @Override
+  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92466ab..678540d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-checkstyle-plugin</artifactId>
             <configuration>
-              <maxAllowedViolations>8768</maxAllowedViolations>
+              <maxAllowedViolations>8789</maxAllowedViolations>
             </configuration>
           </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
deleted file mode 100644
index e61d475..0000000
--- a/tfile/DTFileReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
-import org.apache.hadoop.io.file.tfile.TFile;
-
-import com.datatorrent.netlet.util.Slice;
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * {@link DTFile} wrapper for HDSFileReader
- * <br>
- * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
- * <br>
- * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
- *
- *
- * @since 2.0.0
- */
-public class DTFileReader implements HDSFileReader
-{
-  private final Reader reader;
-  private final Scanner scanner;
-  private final FSDataInputStream fsdis;
-
-  public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
-  {
-    this.fsdis = fsdis;
-    reader = new Reader(fsdis, fileLength, conf);
-    scanner = reader.createScanner();
-  }
-
-  /**
-   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
-   * @see java.io.Closeable#close()
-   */
-  @Override
-  public void close() throws IOException
-  {
-    scanner.close();
-    reader.close();
-    fsdis.close();
-  }
-
-  @Override
-  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
-  {
-    scanner.rewind();
-    for (; !scanner.atEnd(); scanner.advance()) {
-      Entry en = scanner.entry();
-      Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
-      byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
-      data.put(key, value);
-    }
-
-  }
-
-  @Override
-  public void reset() throws IOException
-  {
-    scanner.rewind();
-  }
-
-  @Override
-  public boolean seek(Slice key) throws IOException
-  {
-    return scanner.seekTo(key.buffer, key.offset, key.length);
-  }
-
-  @Override
-  public boolean next(Slice key, Slice value) throws IOException
-  {
-    if (scanner.atEnd()) return false;
-    Entry en = scanner.entry();
-
-    key.buffer = en.getBlockBuffer();
-    key.offset = en.getKeyOffset();
-    key.length = en.getKeyLength();
-
-    value.buffer = en.getBlockBuffer();
-    value.offset = en.getValueOffset();
-    value.length = en.getValueLength();
-
-    scanner.advance();
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
deleted file mode 100644
index 5dc9464..0000000
--- a/tfile/TFileImpl.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
-
-/**
- * A TFile wrapper with HDHTFileAccess API
- * <ul>
- * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> 
- * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> 
- * </ul>
- *
- * @since 2.0.0
- */
-public abstract class TFileImpl extends HDHTFileAccessFSImpl
-{
-  private int minBlockSize = 64 * 1024;
-
-  private String compressName = TFile.COMPRESSION_NONE;
-  
-  private String comparator = "memcmp";
-  
-  private int chunkSize = 1024 * 1024;
-  
-  private int inputBufferSize = 256 * 1024;
-  
-  private int outputBufferSize = 256 * 1024;
-
-  
-  private void setupConfig(Configuration conf)
-  {
-    conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
-    conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
-    conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
-  }
-
-
-  @Override
-  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
-  {
-    FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
-    setupConfig(fs.getConf());
-    return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
-  }
-  
-  public int getMinBlockSize()
-  {
-    return minBlockSize;
-  }
-
-
-  public void setMinBlockSize(int minBlockSize)
-  {
-    this.minBlockSize = minBlockSize;
-  }
-
-
-  public String getCompressName()
-  {
-    return compressName;
-  }
-
-
-  public void setCompressName(String compressName)
-  {
-    this.compressName = compressName;
-  }
-
-
-  public String getComparator()
-  {
-    return comparator;
-  }
-
-
-  public void setComparator(String comparator)
-  {
-    this.comparator = comparator;
-  }
-
-
-  public int getChunkSize()
-  {
-    return chunkSize;
-  }
-
-
-  public void setChunkSize(int chunkSize)
-  {
-    this.chunkSize = chunkSize;
-  }
-
-
-  public int getInputBufferSize()
-  {
-    return inputBufferSize;
-  }
-
-
-  public void setInputBufferSize(int inputBufferSize)
-  {
-    this.inputBufferSize = inputBufferSize;
-  }
-
-
-  public int getOutputBufferSize()
-  {
-    return outputBufferSize;
-  }
-
-
-  public void setOutputBufferSize(int outputBufferSize)
-  {
-    this.outputBufferSize = outputBufferSize;
-  }
-  
-  /**
-   * Return {@link TFile} {@link Reader}
-   *
-   */
-  public static class DefaultTFileImpl extends TFileImpl{
-    
-    @Override
-    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
-    {
-      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
-      long fileLength = getFileSize(bucketKey, fileName);
-      super.setupConfig(fs.getConf());
-      return new TFileReader(fsdis, fileLength, fs.getConf());
-    }
-    
-  }
-  
-  
-  /**
-   * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
-   *
-   */
-  public static class DTFileImpl extends TFileImpl {
-    
-    @Override
-    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
-    {
-      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
-      long fileLength = getFileSize(bucketKey, fileName);
-      super.setupConfig(fs.getConf());
-      return new DTFileReader(fsdis, fileLength, fs.getConf());
-    }
-    
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
deleted file mode 100644
index 0994666..0000000
--- a/tfile/TFileReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.netlet.util.Slice;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * TFileReader
- *
- * @since 2.0.0
- */
-public class TFileReader implements HDSFileReader
-{
-
-  private final Reader reader;
-  private final Scanner scanner;
-  private final FSDataInputStream fsdis;
-  private boolean closed = false;
-
-  public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
-  {
-    this.fsdis = fsdis;
-    reader = new Reader(fsdis, fileLength, conf);
-    scanner = reader.createScanner();
-  }
-
-  /**
-   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
-   * @see java.io.Closeable#close()
-   */
-  @Override
-  public void close() throws IOException
-  {
-    closed = true;
-    scanner.close();
-    reader.close();
-    fsdis.close();
-  }
-
-  @Override
-  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
-  {
-    scanner.rewind();
-    for (; !scanner.atEnd(); scanner.advance()) {
-      Entry en = scanner.entry();
-      int klen = en.getKeyLength();
-      int vlen = en.getValueLength();
-      byte[] key = new byte[klen];
-      byte[] value = new byte[vlen];
-      en.getKey(key);
-      en.getValue(value);
-      data.put(new Slice(key, 0, key.length), value);
-    }
-
-  }
-
-  @Override
-  public void reset() throws IOException
-  {
-    scanner.rewind();
-  }
-
-  @Override
-  public boolean seek(Slice key) throws IOException
-  {
-    try {
-      return scanner.seekTo(key.buffer, key.offset, key.length);
-    } catch (NullPointerException ex) {
-      if (closed)
-        throw new IOException("Stream was closed");
-      else
-        throw ex;
-    }
-  }
-
-  @Override
-  public boolean next(Slice key, Slice value) throws IOException
-  {
-    if (scanner.atEnd()) return false;
-    Entry en = scanner.entry();
-    byte[] rkey = new byte[en.getKeyLength()];
-    byte[] rval = new byte[en.getValueLength()];
-    en.getKey(rkey);
-    en.getValue(rval);
-
-    key.buffer = rkey;
-    key.offset = 0;
-    key.length = en.getKeyLength();
-
-    value.buffer = rval;
-    value.offset = 0;
-    value.length = en.getValueLength();
-
-    scanner.advance();
-    return true;
-  }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
deleted file mode 100644
index 549e1b8..0000000
--- a/tfile/TFileWriter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
-
-/**
- * TFileWriter
- *
- * @since 2.0.0
- */
-public final class TFileWriter implements HDSFileWriter
-{
-  private Writer writer;
-  
-  private FSDataOutputStream fsdos;
-  
-  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
-  {
-    this.fsdos = stream;
-    writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
-    
-  }
-
-  @Override
-  public void close() throws IOException
-  {
-    writer.close();
-    fsdos.close();
-  }
-
-  @Override
-  public void append(byte[] key, byte[] value) throws IOException
-  {
-    writer.append(key, value);
-  }
-
-  @Override
-  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
-
-}


[7/7] incubator-apex-malhar git commit: Merge branch 'MLHR-1916' of https://github.com/chandnisingh/incubator-apex-malhar into devel-3

Posted by hs...@apache.org.
Merge branch 'MLHR-1916' of https://github.com/chandnisingh/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/333a7073
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/333a7073
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/333a7073

Branch: refs/heads/devel-3
Commit: 333a7073308a7dc5beecb50b731d0904ecc1da24
Parents: e24c14c 7d2f474
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Nov 30 13:27:55 2015 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Nov 30 13:27:55 2015 -0800

----------------------------------------------------------------------
 .../lib/fileaccess/DTFileReader.java            | 112 ++++++++++++
 .../datatorrent/lib/fileaccess/FileAccess.java  | 129 ++++++++++++++
 .../lib/fileaccess/FileAccessFSImpl.java        | 130 ++++++++++++++
 .../datatorrent/lib/fileaccess/TFileImpl.java   | 178 +++++++++++++++++++
 .../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++
 .../datatorrent/lib/fileaccess/TFileWriter.java |  61 +++++++
 pom.xml                                         |   2 +-
 7 files changed, 736 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/333a7073/pom.xml
----------------------------------------------------------------------