You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/30 22:28:26 UTC
[1/7] incubator-apex-malhar git commit: Rename HDS to HDHT.
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 e24c14c3e -> 333a70733
Rename HDS to HDHT.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4e47d236
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4e47d236
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4e47d236
Branch: refs/heads/devel-3
Commit: 4e47d236cbd8a1ca3c6dd96235cd588a7eb1d2e6
Parents: 217f8db
Author: thomas <th...@datatorrent.com>
Authored: Mon Dec 8 15:22:57 2014 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:01 2015 -0800
----------------------------------------------------------------------
AbstractSinglePortHDSWriter.java | 194 ++++++++++++++++++++++++++++++++++
HDHTFileAccess.java | 122 +++++++++++++++++++++
HDHTFileAccessFSImpl.java | 125 ++++++++++++++++++++++
tfile/DTFileReader.java | 110 +++++++++++++++++++
tfile/TFileImpl.java | 176 ++++++++++++++++++++++++++++++
tfile/TFileReader.java | 110 +++++++++++++++++++
tfile/TFileWriter.java | 55 ++++++++++
7 files changed, 892 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/AbstractSinglePortHDSWriter.java
----------------------------------------------------------------------
diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java
new file mode 100644
index 0000000..04fa602
--- /dev/null
+++ b/AbstractSinglePortHDSWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.common.util.Slice;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+/**
+ * Operator that receives data on port and writes it to the data store.
+ * Implements partitioning, maps partition key to the store bucket.
+ * The derived class supplies the codec for partitioning and key-value serialization.
+ * @param <EVENT>
+ */
+public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements Partitioner<AbstractSinglePortHDSWriter<EVENT>>
+{
+ public interface HDSCodec<EVENT> extends StreamCodec<EVENT>
+ {
+ byte[] getKeyBytes(EVENT event);
+ byte[] getValueBytes(EVENT event);
+ EVENT fromKeyValue(Slice key, byte[] value);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class);
+
+ protected int partitionMask;
+
+ protected Set<Integer> partitions;
+
+ protected transient HDSCodec<EVENT> codec;
+
+ @Min(1)
+ private int partitionCount = 1;
+
+ public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>()
+ {
+ @Override
+ public void process(EVENT event)
+ {
+ try {
+ processEvent(event);
+ } catch (IOException e) {
+ throw new RuntimeException("Error processing " + event, e);
+ }
+ }
+
+ @Override
+ public StreamCodec<EVENT> getStreamCodec()
+ {
+ return getCodec();
+ }
+ };
+
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
+ /**
+ * Storage bucket for the given event. Only one partition can write to a storage bucket and by default it is
+ * identified by the partition id.
+ *
+ * @param event
+ * @return The bucket key.
+ */
+ protected long getBucketKey(EVENT event)
+ {
+ return (codec.getPartition(event) & partitionMask);
+ }
+
+ protected void processEvent(EVENT event) throws IOException
+ {
+ byte[] key = codec.getKeyBytes(event);
+ byte[] value = codec.getValueBytes(event);
+ super.put(getBucketKey(event), new Slice(key), value);
+ }
+
+ abstract protected HDSCodec<EVENT> getCodec();
+
+ @Override
+ public void setup(OperatorContext arg0)
+ {
+ LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask, this.partitions));
+ super.setup(arg0);
+ try {
+ this.codec = getCodec();
+ // inject the operator reference, if such field exists
+ // TODO: replace with broader solution
+ Class<?> cls = this.codec.getClass();
+ while (cls != null) {
+ for (Field field : cls.getDeclaredFields()) {
+ if (field.getType().isAssignableFrom(this.getClass())) {
+ field.setAccessible(true);
+ field.set(this.codec, this);
+ }
+ }
+ cls = cls.getSuperclass();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create codec", e);
+ }
+ }
+
+ @Override
+ public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> partitions, int incrementalCapacity)
+ {
+ boolean isInitialPartition = partitions.iterator().next().getStats() == null;
+
+ if (!isInitialPartition) {
+ // support for dynamic partitioning requires lineage tracking
+ LOG.warn("Dynamic partitioning not implemented");
+ return partitions;
+ }
+
+ int totalCount;
+
+ //Get the size of the partition for parallel partitioning
+ if(incrementalCapacity != 0) {
+ totalCount = incrementalCapacity;
+ }
+ //Do normal partitioning
+ else {
+ totalCount = partitionCount;
+ }
+
+ Kryo lKryo = new Kryo();
+ Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
+ for (int i = 0; i < totalCount; i++) {
+ // Kryo.copy fails as it attempts to clone transient fields (input port)
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Output output = new Output(bos);
+ lKryo.writeObject(output, this);
+ output.close();
+ Input lInput = new Input(bos.toByteArray());
+ @SuppressWarnings("unchecked")
+ AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass());
+ newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper));
+ }
+
+ // assign the partition keys
+ DefaultPartition.assignPartitionKeys(newPartitions, input);
+
+ for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) {
+ PartitionKeys pks = p.getPartitionKeys().get(input);
+ p.getPartitionedInstance().partitionMask = pks.mask;
+ p.getPartitionedInstance().partitions = pks.partitions;
+ }
+
+ return newPartitions;
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>> arg0)
+ {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
new file mode 100644
index 0000000..fc3d56f
--- /dev/null
+++ b/HDHTFileAccess.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.common.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ */
+public interface HDHTFileAccess extends Closeable
+{
+ void init();
+
+ DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+ DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+ /**
+ * Atomic file rename.
+ * @param bucketKey
+ * @param oldName
+ * @param newName
+ * @throws IOException
+ */
+ void rename(long bucketKey, String oldName, String newName) throws IOException;
+ void delete(long bucketKey, String fileName) throws IOException;
+
+ long getFileSize(long bucketKey, String s) throws IOException;
+
+ /**
+ * HDHT Data File Format Reader
+ */
+ interface HDSFileReader extends Closeable
+ {
+ /**
+ * Read the entire contents of the underlying file into a TreeMap structure
+ * @param data
+ * @throws IOException
+ */
+ //Move to
+ // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+ void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+ /**
+ * Repositions the pointer to the beginning of the underlying file.
+ * @throws IOException
+ */
+ void reset() throws IOException;
+
+ /**
+ * Searches for a matching key, and positions the pointer before the start of the key.
+ * @param key Byte array representing the key
+ * @throws IOException
+ * @return true if a given key is found
+ */
+ boolean seek(Slice key) throws IOException;
+
+ /**
+ * Reads next available key/value pair starting from the current pointer position
+ * into Slice objects and advances pointer to next key. If pointer is at the end
+ * of the file, false is returned, and Slice objects remains unmodified.
+ *
+ * @param key Empty slice object
+ * @param value Empty slice object
+ * @return True if key/value were successfully read, false otherwise
+ * @throws IOException
+ */
+ boolean next(Slice key, Slice value) throws IOException;
+
+ }
+
+ /**
+ * HDHT Data File Format Writer
+ */
+ interface HDSFileWriter extends Closeable {
+ /**
+ * Appends key/value pair to the underlying file.
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ void append(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Returns number of bytes written to the underlying stream.
+ * @return The bytes written.
+ * @throws IOException
+ */
+ long getBytesWritten() throws IOException;
+ }
+
+ /**
+ * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
+ * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
+ */
+ public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
+
+ /**
+ * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
+ * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
+ */
+ public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
new file mode 100644
index 0000000..ad9aa05
--- /dev/null
+++ b/HDHTFileAccessFSImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ */
+abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
+{
+ @NotNull
+ private String basePath;
+ protected transient FileSystem fs;
+
+ public HDHTFileAccessFSImpl()
+ {
+ }
+
+ public String getBasePath()
+ {
+ return basePath;
+ }
+
+ public void setBasePath(String path)
+ {
+ this.basePath = path;
+ }
+
+ protected Path getFilePath(long bucketKey, String fileName) {
+ return new Path(getBucketPath(bucketKey), fileName);
+ }
+
+ protected Path getBucketPath(long bucketKey)
+ {
+ return new Path(basePath, Long.toString(bucketKey));
+ }
+
+ @Override
+ public long getFileSize(long bucketKey, String fileName) throws IOException {
+ return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ fs.close();
+ }
+
+ @Override
+ public void init()
+ {
+ if (fs == null) {
+ Path dataFilePath = new Path(basePath);
+ try {
+ fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void delete(long bucketKey, String fileName) throws IOException
+ {
+ fs.delete(getFilePath(bucketKey, fileName), true);
+ }
+
+ @Override
+ public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+ {
+ Path path = getFilePath(bucketKey, fileName);
+ return fs.create(path, true);
+ }
+
+ @Override
+ public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+ {
+ return fs.open(getFilePath(bucketKey, fileName));
+ }
+
+ @Override
+ public void rename(long bucketKey, String fromName, String toName) throws IOException
+ {
+ FileContext fc = FileContext.getFileContext(fs.getUri());
+ Path bucketPath = getBucketPath(bucketKey);
+ // file context requires absolute path
+ if (!bucketPath.isAbsolute()) {
+ bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+ }
+ fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
new file mode 100644
index 0000000..fefadaf
--- /dev/null
+++ b/tfile/DTFileReader.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.common.util.Slice;
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
+ *
+ *
+ */
+public class DTFileReader implements HDSFileReader
+{
+ private final Reader reader;
+ private final Scanner scanner;
+ private final FSDataInputStream fsdis;
+
+ public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+ {
+ this.fsdis = fsdis;
+ reader = new Reader(fsdis, fileLength, conf);
+ scanner = reader.createScanner();
+ }
+
+ /**
+ * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException
+ {
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+
+ @Override
+ public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+ {
+ scanner.rewind();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ Entry en = scanner.entry();
+ Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+ byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
+ data.put(key, value);
+ }
+
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ scanner.rewind();
+ }
+
+ @Override
+ public boolean seek(Slice key) throws IOException
+ {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ }
+
+ @Override
+ public boolean next(Slice key, Slice value) throws IOException
+ {
+ if (scanner.atEnd()) return false;
+ Entry en = scanner.entry();
+
+ key.buffer = en.getBlockBuffer();
+ key.offset = en.getKeyOffset();
+ key.length = en.getKeyLength();
+
+ value.buffer = en.getBlockBuffer();
+ value.offset = en.getValueOffset();
+ value.length = en.getValueLength();
+
+ scanner.advance();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
new file mode 100644
index 0000000..714a5b1
--- /dev/null
+++ b/tfile/TFileImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
+
+/**
+ * A TFile wrapper with HDHTFileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
+ * </ul>
+ *
+ */
+public abstract class TFileImpl extends HDHTFileAccessFSImpl
+{
+ private int minBlockSize = 64 * 1024;
+
+ private String compressName = TFile.COMPRESSION_NONE;
+
+ private String comparator = "memcmp";
+
+ private int chunkSize = 1024 * 1024;
+
+ private int inputBufferSize = 256 * 1024;
+
+ private int outputBufferSize = 256 * 1024;
+
+
+ private void setupConfig(Configuration conf)
+ {
+ conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+ conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+ conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+ }
+
+
+ @Override
+ public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
+ {
+ FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+ setupConfig(fs.getConf());
+ return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+ }
+
+ public int getMinBlockSize()
+ {
+ return minBlockSize;
+ }
+
+
+ public void setMinBlockSize(int minBlockSize)
+ {
+ this.minBlockSize = minBlockSize;
+ }
+
+
+ public String getCompressName()
+ {
+ return compressName;
+ }
+
+
+ public void setCompressName(String compressName)
+ {
+ this.compressName = compressName;
+ }
+
+
+ public String getComparator()
+ {
+ return comparator;
+ }
+
+
+ public void setComparator(String comparator)
+ {
+ this.comparator = comparator;
+ }
+
+
+ public int getChunkSize()
+ {
+ return chunkSize;
+ }
+
+
+ public void setChunkSize(int chunkSize)
+ {
+ this.chunkSize = chunkSize;
+ }
+
+
+ public int getInputBufferSize()
+ {
+ return inputBufferSize;
+ }
+
+
+ public void setInputBufferSize(int inputBufferSize)
+ {
+ this.inputBufferSize = inputBufferSize;
+ }
+
+
+ public int getOutputBufferSize()
+ {
+ return outputBufferSize;
+ }
+
+
+ public void setOutputBufferSize(int outputBufferSize)
+ {
+ this.outputBufferSize = outputBufferSize;
+ }
+
+ /**
+ * Return {@link TFile} {@link Reader}
+ *
+ */
+ public static class DefaultTFileImpl extends TFileImpl{
+
+ @Override
+ public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
+ {
+ FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
+ long fileLength = getFileSize(bucketKey, fileName);
+ super.setupConfig(fs.getConf());
+ return new TFileReader(fsdis, fileLength, fs.getConf());
+ }
+
+ }
+
+
+ /**
+ * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+ *
+ */
+ public static class DTFileImpl extends TFileImpl {
+
+ @Override
+ public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
+ {
+ FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
+ long fileLength = getFileSize(bucketKey, fileName);
+ super.setupConfig(fs.getConf());
+ return new DTFileReader(fsdis, fileLength, fs.getConf());
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
new file mode 100644
index 0000000..d20408c
--- /dev/null
+++ b/tfile/TFileReader.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.common.util.Slice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+
+public class TFileReader implements HDSFileReader
+{
+
+ private final Reader reader;
+ private final Scanner scanner;
+ private final FSDataInputStream fsdis;
+
+ public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+ {
+ this.fsdis = fsdis;
+ reader = new Reader(fsdis, fileLength, conf);
+ scanner = reader.createScanner();
+ }
+
+ /**
+ * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException
+ {
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+
+ @Override
+ public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+ {
+ scanner.rewind();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ Entry en = scanner.entry();
+ int klen = en.getKeyLength();
+ int vlen = en.getValueLength();
+ byte[] key = new byte[klen];
+ byte[] value = new byte[vlen];
+ en.getKey(key);
+ en.getValue(value);
+ data.put(new Slice(key, 0, key.length), value);
+ }
+
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ scanner.rewind();
+ }
+
+ @Override
+ public boolean seek(Slice key) throws IOException
+ {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ }
+
+ @Override
+ public boolean next(Slice key, Slice value) throws IOException
+ {
+ if (scanner.atEnd()) return false;
+ Entry en = scanner.entry();
+ byte[] rkey = new byte[en.getKeyLength()];
+ byte[] rval = new byte[en.getValueLength()];
+ en.getKey(rkey);
+ en.getValue(rval);
+
+ key.buffer = rkey;
+ key.offset = 0;
+ key.length = en.getKeyLength();
+
+ value.buffer = rval;
+ value.offset = 0;
+ value.length = en.getValueLength();
+
+ scanner.advance();
+ return true;
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
new file mode 100644
index 0000000..b6fd90d
--- /dev/null
+++ b/tfile/TFileWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
+
+public final class TFileWriter implements HDSFileWriter
+{
+ private Writer writer;
+
+ private FSDataOutputStream fsdos;
+
+ public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
+ {
+ this.fsdos = stream;
+ writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ writer.close();
+ fsdos.close();
+ }
+
+ @Override
+ public void append(byte[] key, byte[] value) throws IOException
+ {
+ writer.append(key, value);
+ }
+
+ @Override
+ public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}
[4/7] incubator-apex-malhar git commit: MLHR-1721: Convert
NullPointerException to IOException during seek to avoid operator failure.
Posted by hs...@apache.org.
MLHR-1721: Convert NullPointerException to IOException during seek
to avoid operator failure.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/bd9cd5c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/bd9cd5c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/bd9cd5c9
Branch: refs/heads/devel-3
Commit: bd9cd5c932e44c401b8a334e0d4e7736c2980ea7
Parents: 818c683
Author: Tushar Gosavi <tu...@datatorrent.com>
Authored: Thu Apr 30 11:11:41 2015 +0530
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800
----------------------------------------------------------------------
tfile/TFileReader.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bd9cd5c9/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
index 972b4f9..2f06da9 100644
--- a/tfile/TFileReader.java
+++ b/tfile/TFileReader.java
@@ -18,6 +18,7 @@ package com.datatorrent.contrib.hdht.tfile;
import java.io.IOException;
import java.util.TreeMap;
+import com.datatorrent.common.util.DTThrowable;
import com.datatorrent.common.util.Slice;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ public class TFileReader implements HDSFileReader
private final Reader reader;
private final Scanner scanner;
private final FSDataInputStream fsdis;
+ private boolean closed = false;
public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
{
@@ -54,6 +56,7 @@ public class TFileReader implements HDSFileReader
@Override
public void close() throws IOException
{
+ closed = true;
scanner.close();
reader.close();
fsdis.close();
@@ -85,7 +88,14 @@ public class TFileReader implements HDSFileReader
@Override
public boolean seek(Slice key) throws IOException
{
- return scanner.seekTo(key.buffer, key.offset, key.length);
+ try {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ } catch (NullPointerException ex) {
+ if (closed)
+ throw new IOException("Stream was closed");
+ else
+ throw ex;
+ }
}
@Override
[5/7] incubator-apex-malhar git commit: Adding missing @since tags
for 2.0.0 release
Posted by hs...@apache.org.
Adding missing @since tags for 2.0.0 release
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/818c683e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/818c683e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/818c683e
Branch: refs/heads/devel-3
Commit: 818c683e051233e92f5b1d3263d6f8e222d3d9b9
Parents: 1576ce7
Author: sashadt <sa...@datatorrent.com>
Authored: Fri Jan 30 18:44:47 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800
----------------------------------------------------------------------
HDHTFileAccess.java | 2 ++
HDHTFileAccessFSImpl.java | 2 ++
tfile/DTFileReader.java | 1 +
tfile/TFileImpl.java | 1 +
tfile/TFileReader.java | 5 +++++
tfile/TFileWriter.java | 5 +++++
6 files changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
index fc3d56f..a26c1a7 100644
--- a/HDHTFileAccess.java
+++ b/HDHTFileAccess.java
@@ -25,6 +25,8 @@ import com.datatorrent.common.util.Slice;
/**
* Abstraction for file system and format interaction.
+ *
+ * @since 2.0.0
*/
public interface HDHTFileAccess extends Closeable
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
index ad9aa05..5c9cbfa 100644
--- a/HDHTFileAccessFSImpl.java
+++ b/HDHTFileAccessFSImpl.java
@@ -31,6 +31,8 @@ import com.datatorrent.common.util.DTThrowable;
/**
* Hadoop file system backed store.
+ *
+ * @since 2.0.0
*/
abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
index fefadaf..a164b98 100644
--- a/tfile/DTFileReader.java
+++ b/tfile/DTFileReader.java
@@ -39,6 +39,7 @@ import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
* DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
*
*
+ * @since 2.0.0
*/
public class DTFileReader implements HDSFileReader
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
index 714a5b1..5dc9464 100644
--- a/tfile/TFileImpl.java
+++ b/tfile/TFileImpl.java
@@ -34,6 +34,7 @@ import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
* <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
* </ul>
*
+ * @since 2.0.0
*/
public abstract class TFileImpl extends HDHTFileAccessFSImpl
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
index d20408c..972b4f9 100644
--- a/tfile/TFileReader.java
+++ b/tfile/TFileReader.java
@@ -28,6 +28,11 @@ import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+/**
+ * TFileReader
+ *
+ * @since 2.0.0
+ */
public class TFileReader implements HDSFileReader
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/818c683e/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
index b6fd90d..549e1b8 100644
--- a/tfile/TFileWriter.java
+++ b/tfile/TFileWriter.java
@@ -23,6 +23,11 @@ import org.apache.hadoop.io.file.tfile.TFile.Writer;
import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
+/**
+ * TFileWriter
+ *
+ * @since 2.0.0
+ */
public final class TFileWriter implements HDSFileWriter
{
private Writer writer;
[3/7] incubator-apex-malhar git commit: HDHT rename.
Posted by hs...@apache.org.
HDHT rename.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1576ce7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1576ce7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1576ce7d
Branch: refs/heads/devel-3
Commit: 1576ce7d5366f1a0b79b72511956604092260c00
Parents: 4e47d23
Author: thomas <th...@datatorrent.com>
Authored: Fri Dec 19 17:43:21 2014 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800
----------------------------------------------------------------------
AbstractSinglePortHDSWriter.java | 194 ----------------------------------
1 file changed, 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1576ce7d/AbstractSinglePortHDSWriter.java
----------------------------------------------------------------------
diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java
deleted file mode 100644
index 04fa602..0000000
--- a/AbstractSinglePortHDSWriter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.Min;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.common.util.Slice;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-
-/**
- * Operator that receives data on port and writes it to the data store.
- * Implements partitioning, maps partition key to the store bucket.
- * The derived class supplies the codec for partitioning and key-value serialization.
- * @param <EVENT>
- */
-public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements Partitioner<AbstractSinglePortHDSWriter<EVENT>>
-{
- public interface HDSCodec<EVENT> extends StreamCodec<EVENT>
- {
- byte[] getKeyBytes(EVENT event);
- byte[] getValueBytes(EVENT event);
- EVENT fromKeyValue(Slice key, byte[] value);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class);
-
- protected int partitionMask;
-
- protected Set<Integer> partitions;
-
- protected transient HDSCodec<EVENT> codec;
-
- @Min(1)
- private int partitionCount = 1;
-
- public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>()
- {
- @Override
- public void process(EVENT event)
- {
- try {
- processEvent(event);
- } catch (IOException e) {
- throw new RuntimeException("Error processing " + event, e);
- }
- }
-
- @Override
- public StreamCodec<EVENT> getStreamCodec()
- {
- return getCodec();
- }
- };
-
- public void setPartitionCount(int partitionCount)
- {
- this.partitionCount = partitionCount;
- }
-
- public int getPartitionCount()
- {
- return partitionCount;
- }
-
- /**
- * Storage bucket for the given event. Only one partition can write to a storage bucket and by default it is
- * identified by the partition id.
- *
- * @param event
- * @return The bucket key.
- */
- protected long getBucketKey(EVENT event)
- {
- return (codec.getPartition(event) & partitionMask);
- }
-
- protected void processEvent(EVENT event) throws IOException
- {
- byte[] key = codec.getKeyBytes(event);
- byte[] value = codec.getValueBytes(event);
- super.put(getBucketKey(event), new Slice(key), value);
- }
-
- abstract protected HDSCodec<EVENT> getCodec();
-
- @Override
- public void setup(OperatorContext arg0)
- {
- LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask, this.partitions));
- super.setup(arg0);
- try {
- this.codec = getCodec();
- // inject the operator reference, if such field exists
- // TODO: replace with broader solution
- Class<?> cls = this.codec.getClass();
- while (cls != null) {
- for (Field field : cls.getDeclaredFields()) {
- if (field.getType().isAssignableFrom(this.getClass())) {
- field.setAccessible(true);
- field.set(this.codec, this);
- }
- }
- cls = cls.getSuperclass();
- }
- } catch (Exception e) {
- throw new RuntimeException("Failed to create codec", e);
- }
- }
-
- @Override
- public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> partitions, int incrementalCapacity)
- {
- boolean isInitialPartition = partitions.iterator().next().getStats() == null;
-
- if (!isInitialPartition) {
- // support for dynamic partitioning requires lineage tracking
- LOG.warn("Dynamic partitioning not implemented");
- return partitions;
- }
-
- int totalCount;
-
- //Get the size of the partition for parallel partitioning
- if(incrementalCapacity != 0) {
- totalCount = incrementalCapacity;
- }
- //Do normal partitioning
- else {
- totalCount = partitionCount;
- }
-
- Kryo lKryo = new Kryo();
- Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
- for (int i = 0; i < totalCount; i++) {
- // Kryo.copy fails as it attempts to clone transient fields (input port)
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- lKryo.writeObject(output, this);
- output.close();
- Input lInput = new Input(bos.toByteArray());
- @SuppressWarnings("unchecked")
- AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass());
- newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper));
- }
-
- // assign the partition keys
- DefaultPartition.assignPartitionKeys(newPartitions, input);
-
- for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) {
- PartitionKeys pks = p.getPartitionKeys().get(input);
- p.getPartitionedInstance().partitionMask = pks.mask;
- p.getPartitionedInstance().partitions = pks.partitions;
- }
-
- return newPartitions;
- }
-
- @Override
- public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>> arg0)
- {
- }
-
-}
[2/7] incubator-apex-malhar git commit: make the code to compile with
open source Apex
Posted by hs...@apache.org.
make the code to compile with open source Apex
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c7874616
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c7874616
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c7874616
Branch: refs/heads/devel-3
Commit: c7874616a300f84aac44af62eac4b0b534e547aa
Parents: bd9cd5c
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Jun 22 17:07:41 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800
----------------------------------------------------------------------
HDHTFileAccess.java | 2 +-
HDHTFileAccessFSImpl.java | 2 +-
tfile/DTFileReader.java | 2 +-
tfile/TFileReader.java | 4 ++--
4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
index a26c1a7..266ba75 100644
--- a/HDHTFileAccess.java
+++ b/HDHTFileAccess.java
@@ -21,7 +21,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.TreeMap;
-import com.datatorrent.common.util.Slice;
+import com.datatorrent.netlet.util.Slice;
/**
* Abstraction for file system and format interaction.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
index 5c9cbfa..13dd0ad 100644
--- a/HDHTFileAccessFSImpl.java
+++ b/HDHTFileAccessFSImpl.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
-import com.datatorrent.common.util.DTThrowable;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* Hadoop file system backed store.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
index a164b98..e61d475 100644
--- a/tfile/DTFileReader.java
+++ b/tfile/DTFileReader.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
import org.apache.hadoop.io.file.tfile.TFile;
-import com.datatorrent.common.util.Slice;
+import com.datatorrent.netlet.util.Slice;
import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c7874616/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
index 2f06da9..0994666 100644
--- a/tfile/TFileReader.java
+++ b/tfile/TFileReader.java
@@ -18,8 +18,8 @@ package com.datatorrent.contrib.hdht.tfile;
import java.io.IOException;
import java.util.TreeMap;
-import com.datatorrent.common.util.DTThrowable;
-import com.datatorrent.common.util.Slice;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.netlet.util.Slice;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
[6/7] incubator-apex-malhar git commit: MLHR-1916 #resolve #comment
Added back the FileAccess api and its implementations
Posted by hs...@apache.org.
MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7d2f4749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7d2f4749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7d2f4749
Branch: refs/heads/devel-3
Commit: 7d2f47491498c6b1c550f70e626dd76ba1db393e
Parents: c787461
Author: MalharJenkins <je...@datatorrent.com>
Authored: Mon Nov 23 21:14:41 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 22:11:18 2015 -0800
----------------------------------------------------------------------
HDHTFileAccess.java | 124 -------------
HDHTFileAccessFSImpl.java | 127 -------------
.../lib/fileaccess/DTFileReader.java | 112 ++++++++++++
.../datatorrent/lib/fileaccess/FileAccess.java | 129 ++++++++++++++
.../lib/fileaccess/FileAccessFSImpl.java | 130 ++++++++++++++
.../datatorrent/lib/fileaccess/TFileImpl.java | 178 +++++++++++++++++++
.../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++
.../datatorrent/lib/fileaccess/TFileWriter.java | 61 +++++++
pom.xml | 2 +-
tfile/DTFileReader.java | 111 ------------
tfile/TFileImpl.java | 177 ------------------
tfile/TFileReader.java | 125 -------------
tfile/TFileWriter.java | 60 -------
13 files changed, 736 insertions(+), 725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
deleted file mode 100644
index 266ba75..0000000
--- a/HDHTFileAccess.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Abstraction for file system and format interaction.
- *
- * @since 2.0.0
- */
-public interface HDHTFileAccess extends Closeable
-{
- void init();
-
- DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
- DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
-
- /**
- * Atomic file rename.
- * @param bucketKey
- * @param oldName
- * @param newName
- * @throws IOException
- */
- void rename(long bucketKey, String oldName, String newName) throws IOException;
- void delete(long bucketKey, String fileName) throws IOException;
-
- long getFileSize(long bucketKey, String s) throws IOException;
-
- /**
- * HDHT Data File Format Reader
- */
- interface HDSFileReader extends Closeable
- {
- /**
- * Read the entire contents of the underlying file into a TreeMap structure
- * @param data
- * @throws IOException
- */
- //Move to
- // void readFully(TreeMap<Slice, Slice> data) throws IOException;
- void readFully(TreeMap<Slice, byte[]> data) throws IOException;
-
- /**
- * Repositions the pointer to the beginning of the underlying file.
- * @throws IOException
- */
- void reset() throws IOException;
-
- /**
- * Searches for a matching key, and positions the pointer before the start of the key.
- * @param key Byte array representing the key
- * @throws IOException
- * @return true if a given key is found
- */
- boolean seek(Slice key) throws IOException;
-
- /**
- * Reads next available key/value pair starting from the current pointer position
- * into Slice objects and advances pointer to next key. If pointer is at the end
- * of the file, false is returned, and Slice objects remains unmodified.
- *
- * @param key Empty slice object
- * @param value Empty slice object
- * @return True if key/value were successfully read, false otherwise
- * @throws IOException
- */
- boolean next(Slice key, Slice value) throws IOException;
-
- }
-
- /**
- * HDHT Data File Format Writer
- */
- interface HDSFileWriter extends Closeable {
- /**
- * Appends key/value pair to the underlying file.
- * @param key
- * @param value
- * @throws IOException
- */
- void append(byte[] key, byte[] value) throws IOException;
-
- /**
- * Returns number of bytes written to the underlying stream.
- * @return The bytes written.
- * @throws IOException
- */
- long getBytesWritten() throws IOException;
- }
-
- /**
- * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
- * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
- */
- public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
-
- /**
- * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
- * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
- */
- public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
deleted file mode 100644
index 13dd0ad..0000000
--- a/HDHTFileAccessFSImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.IOException;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Hadoop file system backed store.
- *
- * @since 2.0.0
- */
-abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
-{
- @NotNull
- private String basePath;
- protected transient FileSystem fs;
-
- public HDHTFileAccessFSImpl()
- {
- }
-
- public String getBasePath()
- {
- return basePath;
- }
-
- public void setBasePath(String path)
- {
- this.basePath = path;
- }
-
- protected Path getFilePath(long bucketKey, String fileName) {
- return new Path(getBucketPath(bucketKey), fileName);
- }
-
- protected Path getBucketPath(long bucketKey)
- {
- return new Path(basePath, Long.toString(bucketKey));
- }
-
- @Override
- public long getFileSize(long bucketKey, String fileName) throws IOException {
- return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
- }
-
- @Override
- public void close() throws IOException
- {
- fs.close();
- }
-
- @Override
- public void init()
- {
- if (fs == null) {
- Path dataFilePath = new Path(basePath);
- try {
- fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- }
- }
-
- @Override
- public void delete(long bucketKey, String fileName) throws IOException
- {
- fs.delete(getFilePath(bucketKey, fileName), true);
- }
-
- @Override
- public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
- {
- Path path = getFilePath(bucketKey, fileName);
- return fs.create(path, true);
- }
-
- @Override
- public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
- {
- return fs.open(getFilePath(bucketKey, fileName));
- }
-
- @Override
- public void rename(long bucketKey, String fromName, String toName) throws IOException
- {
- FileContext fc = FileContext.getFileContext(fs.getUri());
- Path bucketPath = getBucketPath(bucketKey);
- // file context requires absolute path
- if (!bucketPath.isAbsolute()) {
- bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
- }
- fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
- }
-
- @Override
- public String toString()
- {
- return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
new file mode 100644
index 0000000..cb97520
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
+ *
+ *
+ * @since 2.0.0
+ */
+public class DTFileReader implements FileAccess.FileReader
+{
+ private final Reader reader;
+ private final Scanner scanner;
+ private final FSDataInputStream fsdis;
+
+ public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+ {
+ this.fsdis = fsdis;
+ reader = new Reader(fsdis, fileLength, conf);
+ scanner = reader.createScanner();
+ }
+
+ /**
+ * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException
+ {
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+
+ @Override
+ public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+ {
+ scanner.rewind();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ Entry en = scanner.entry();
+ Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+ byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
+ data.put(key, value);
+ }
+
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ scanner.rewind();
+ }
+
+ @Override
+ public boolean seek(Slice key) throws IOException
+ {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ }
+
+ @Override
+ public boolean next(Slice key, Slice value) throws IOException
+ {
+ if (scanner.atEnd()) return false;
+ Entry en = scanner.entry();
+
+ key.buffer = en.getBlockBuffer();
+ key.offset = en.getKeyOffset();
+ key.length = en.getKeyLength();
+
+ value.buffer = en.getBlockBuffer();
+ value.offset = en.getValueOffset();
+ value.length = en.getValueLength();
+
+ scanner.advance();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
new file mode 100644
index 0000000..4b7f6e5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ *
+ * @since 2.0.0
+ */
+public interface FileAccess extends Closeable
+{
+ void init();
+
+ DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+
+ DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+ /**
+ * Atomic file rename.
+ * @param bucketKey
+ * @param oldName
+ * @param newName
+ * @throws IOException
+ */
+ void rename(long bucketKey, String oldName, String newName) throws IOException;
+ void delete(long bucketKey, String fileName) throws IOException;
+
+ long getFileSize(long bucketKey, String s) throws IOException;
+
+ /**
+ * Data File Format Reader
+ */
+ interface FileReader extends Closeable
+ {
+ /**
+ * Read the entire contents of the underlying file into a TreeMap structure
+ * @param data
+ * @throws IOException
+ */
+ //Move to
+ // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+ void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+ /**
+ * Repositions the pointer to the beginning of the underlying file.
+ * @throws IOException
+ */
+ void reset() throws IOException;
+
+ /**
+ * Searches for a matching key, and positions the pointer before the start of the key.
+ * @param key Byte array representing the key
+ * @throws IOException
+ * @return true if a given key is found
+ */
+ boolean seek(Slice key) throws IOException;
+
+ /**
+ * Reads next available key/value pair starting from the current pointer position
+ * into Slice objects and advances pointer to next key. If pointer is at the end
+ * of the file, false is returned, and Slice objects remains unmodified.
+ *
+ * @param key Empty slice object
+ * @param value Empty slice object
+ * @return True if key/value were successfully read, false otherwise
+ * @throws IOException
+ */
+ boolean next(Slice key, Slice value) throws IOException;
+
+ }
+
+ /**
+ * Data File Format Writer
+ */
+ interface FileWriter extends Closeable
+ {
+ /**
+ * Appends key/value pair to the underlying file.
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ void append(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Returns number of bytes written to the underlying stream.
+ * @return The bytes written.
+ * @throws IOException
+ */
+ long getBytesWritten() throws IOException;
+ }
+
+ /**
+ * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
+ * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
+ */
+ public FileReader getReader(long bucketKey, String fileName) throws IOException;
+
+ /**
+ * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
+ * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
+ */
+ public FileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
new file mode 100644
index 0000000..80a201a
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ *
+ * @since 2.0.0
+ */
+public abstract class FileAccessFSImpl implements FileAccess
+{
+ @NotNull
+ private String basePath;
+ protected transient FileSystem fs;
+
+ public FileAccessFSImpl()
+ {
+ }
+
+ public String getBasePath()
+ {
+ return basePath;
+ }
+
+ public void setBasePath(String path)
+ {
+ this.basePath = path;
+ }
+
+ protected Path getFilePath(long bucketKey, String fileName) {
+ return new Path(getBucketPath(bucketKey), fileName);
+ }
+
+ protected Path getBucketPath(long bucketKey)
+ {
+ return new Path(basePath, Long.toString(bucketKey));
+ }
+
+ @Override
+ public long getFileSize(long bucketKey, String fileName) throws IOException {
+ return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ fs.close();
+ }
+
+ @Override
+ public void init()
+ {
+ if (fs == null) {
+ Path dataFilePath = new Path(basePath);
+ try {
+ fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void delete(long bucketKey, String fileName) throws IOException
+ {
+ fs.delete(getFilePath(bucketKey, fileName), true);
+ }
+
+ @Override
+ public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+ {
+ Path path = getFilePath(bucketKey, fileName);
+ return fs.create(path, true);
+ }
+
+ @Override
+ public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+ {
+ return fs.open(getFilePath(bucketKey, fileName));
+ }
+
+ @Override
+ public void rename(long bucketKey, String fromName, String toName) throws IOException
+ {
+ FileContext fc = FileContext.getFileContext(fs.getUri());
+ Path bucketPath = getBucketPath(bucketKey);
+ // file context requires absolute path
+ if (!bucketPath.isAbsolute()) {
+ bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+ }
+ fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
new file mode 100644
index 0000000..5526832
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * A TFile wrapper with FileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+public abstract class TFileImpl extends FileAccessFSImpl
+{
+ private int minBlockSize = 64 * 1024;
+
+ private String compressName = TFile.COMPRESSION_NONE;
+
+ private String comparator = "memcmp";
+
+ private int chunkSize = 1024 * 1024;
+
+ private int inputBufferSize = 256 * 1024;
+
+ private int outputBufferSize = 256 * 1024;
+
+
+ private void setupConfig(Configuration conf)
+ {
+ conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+ conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+ conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+ }
+
+
+ @Override
+ public FileWriter getWriter(long bucketKey, String fileName) throws IOException
+ {
+ FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+ setupConfig(fs.getConf());
+ return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+ }
+
+ public int getMinBlockSize()
+ {
+ return minBlockSize;
+ }
+
+
+ public void setMinBlockSize(int minBlockSize)
+ {
+ this.minBlockSize = minBlockSize;
+ }
+
+
+ public String getCompressName()
+ {
+ return compressName;
+ }
+
+
+ public void setCompressName(String compressName)
+ {
+ this.compressName = compressName;
+ }
+
+
+ public String getComparator()
+ {
+ return comparator;
+ }
+
+
+ public void setComparator(String comparator)
+ {
+ this.comparator = comparator;
+ }
+
+
+ public int getChunkSize()
+ {
+ return chunkSize;
+ }
+
+
+ public void setChunkSize(int chunkSize)
+ {
+ this.chunkSize = chunkSize;
+ }
+
+
+ public int getInputBufferSize()
+ {
+ return inputBufferSize;
+ }
+
+
+ public void setInputBufferSize(int inputBufferSize)
+ {
+ this.inputBufferSize = inputBufferSize;
+ }
+
+
+ public int getOutputBufferSize()
+ {
+ return outputBufferSize;
+ }
+
+
+ public void setOutputBufferSize(int outputBufferSize)
+ {
+ this.outputBufferSize = outputBufferSize;
+ }
+
+ /**
+ * Return {@link TFile} {@link Reader}
+ *
+ */
+ public static class DefaultTFileImpl extends TFileImpl{
+
+ @Override
+ public FileReader getReader(long bucketKey, String fileName) throws IOException
+ {
+ FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
+ long fileLength = getFileSize(bucketKey, fileName);
+ super.setupConfig(fs.getConf());
+ return new TFileReader(fsdis, fileLength, fs.getConf());
+ }
+
+ }
+
+
+ /**
+ * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+ *
+ */
+ public static class DTFileImpl extends TFileImpl {
+
+ @Override
+ public FileReader getReader(long bucketKey, String fileName) throws IOException
+ {
+ FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
+ long fileLength = getFileSize(bucketKey, fileName);
+ super.setupConfig(fs.getConf());
+ return new DTFileReader(fsdis, fileLength, fs.getConf());
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
new file mode 100644
index 0000000..8426c3f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * TFileReader
+ *
+ * @since 2.0.0
+ */
+public class TFileReader implements FileAccess.FileReader
+{
+
+ private final Reader reader;
+ private final Scanner scanner;
+ private final FSDataInputStream fsdis;
+ private boolean closed = false;
+
+ public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+ {
+ this.fsdis = fsdis;
+ reader = new Reader(fsdis, fileLength, conf);
+ scanner = reader.createScanner();
+ }
+
+ /**
+ * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException
+ {
+ closed = true;
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+
+ @Override
+ public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+ {
+ scanner.rewind();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ Entry en = scanner.entry();
+ int klen = en.getKeyLength();
+ int vlen = en.getValueLength();
+ byte[] key = new byte[klen];
+ byte[] value = new byte[vlen];
+ en.getKey(key);
+ en.getValue(value);
+ data.put(new Slice(key, 0, key.length), value);
+ }
+
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ scanner.rewind();
+ }
+
+ @Override
+ public boolean seek(Slice key) throws IOException
+ {
+ try {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ } catch (NullPointerException ex) {
+ if (closed)
+ throw new IOException("Stream was closed");
+ else
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean next(Slice key, Slice value) throws IOException
+ {
+ if (scanner.atEnd()) return false;
+ Entry en = scanner.entry();
+ byte[] rkey = new byte[en.getKeyLength()];
+ byte[] rval = new byte[en.getValueLength()];
+ en.getKey(rkey);
+ en.getValue(rval);
+
+ key.buffer = rkey;
+ key.offset = 0;
+ key.length = en.getKeyLength();
+
+ value.buffer = rval;
+ value.offset = 0;
+ value.length = en.getValueLength();
+
+ scanner.advance();
+ return true;
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
new file mode 100644
index 0000000..b362987
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * TFileWriter
+ *
+ * @since 2.0.0
+ */
+public final class TFileWriter implements FileAccess.FileWriter
+{
+ private Writer writer;
+
+ private FSDataOutputStream fsdos;
+
+ public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
+ {
+ this.fsdos = stream;
+ writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ writer.close();
+ fsdos.close();
+ }
+
+ @Override
+ public void append(byte[] key, byte[] value) throws IOException
+ {
+ writer.append(key, value);
+ }
+
+ @Override
+ public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92466ab..678540d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>8768</maxAllowedViolations>
+ <maxAllowedViolations>8789</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
deleted file mode 100644
index e61d475..0000000
--- a/tfile/DTFileReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
-import org.apache.hadoop.io.file.tfile.TFile;
-
-import com.datatorrent.netlet.util.Slice;
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * {@link DTFile} wrapper for HDSFileReader
- * <br>
- * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
- * <br>
- * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
- *
- *
- * @since 2.0.0
- */
-public class DTFileReader implements HDSFileReader
-{
- private final Reader reader;
- private final Scanner scanner;
- private final FSDataInputStream fsdis;
-
- public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
- {
- this.fsdis = fsdis;
- reader = new Reader(fsdis, fileLength, conf);
- scanner = reader.createScanner();
- }
-
- /**
- * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
- * @see java.io.Closeable#close()
- */
- @Override
- public void close() throws IOException
- {
- scanner.close();
- reader.close();
- fsdis.close();
- }
-
- @Override
- public void readFully(TreeMap<Slice, byte[]> data) throws IOException
- {
- scanner.rewind();
- for (; !scanner.atEnd(); scanner.advance()) {
- Entry en = scanner.entry();
- Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
- byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
- data.put(key, value);
- }
-
- }
-
- @Override
- public void reset() throws IOException
- {
- scanner.rewind();
- }
-
- @Override
- public boolean seek(Slice key) throws IOException
- {
- return scanner.seekTo(key.buffer, key.offset, key.length);
- }
-
- @Override
- public boolean next(Slice key, Slice value) throws IOException
- {
- if (scanner.atEnd()) return false;
- Entry en = scanner.entry();
-
- key.buffer = en.getBlockBuffer();
- key.offset = en.getKeyOffset();
- key.length = en.getKeyLength();
-
- value.buffer = en.getBlockBuffer();
- value.offset = en.getValueOffset();
- value.length = en.getValueLength();
-
- scanner.advance();
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
deleted file mode 100644
index 5dc9464..0000000
--- a/tfile/TFileImpl.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
-
-/**
- * A TFile wrapper with HDHTFileAccess API
- * <ul>
- * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
- * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
- * </ul>
- *
- * @since 2.0.0
- */
-public abstract class TFileImpl extends HDHTFileAccessFSImpl
-{
- private int minBlockSize = 64 * 1024;
-
- private String compressName = TFile.COMPRESSION_NONE;
-
- private String comparator = "memcmp";
-
- private int chunkSize = 1024 * 1024;
-
- private int inputBufferSize = 256 * 1024;
-
- private int outputBufferSize = 256 * 1024;
-
-
- private void setupConfig(Configuration conf)
- {
- conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
- conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
- conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
- }
-
-
- @Override
- public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
- {
- FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
- setupConfig(fs.getConf());
- return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
- }
-
- public int getMinBlockSize()
- {
- return minBlockSize;
- }
-
-
- public void setMinBlockSize(int minBlockSize)
- {
- this.minBlockSize = minBlockSize;
- }
-
-
- public String getCompressName()
- {
- return compressName;
- }
-
-
- public void setCompressName(String compressName)
- {
- this.compressName = compressName;
- }
-
-
- public String getComparator()
- {
- return comparator;
- }
-
-
- public void setComparator(String comparator)
- {
- this.comparator = comparator;
- }
-
-
- public int getChunkSize()
- {
- return chunkSize;
- }
-
-
- public void setChunkSize(int chunkSize)
- {
- this.chunkSize = chunkSize;
- }
-
-
- public int getInputBufferSize()
- {
- return inputBufferSize;
- }
-
-
- public void setInputBufferSize(int inputBufferSize)
- {
- this.inputBufferSize = inputBufferSize;
- }
-
-
- public int getOutputBufferSize()
- {
- return outputBufferSize;
- }
-
-
- public void setOutputBufferSize(int outputBufferSize)
- {
- this.outputBufferSize = outputBufferSize;
- }
-
- /**
- * Return {@link TFile} {@link Reader}
- *
- */
- public static class DefaultTFileImpl extends TFileImpl{
-
- @Override
- public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
- {
- FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
- long fileLength = getFileSize(bucketKey, fileName);
- super.setupConfig(fs.getConf());
- return new TFileReader(fsdis, fileLength, fs.getConf());
- }
-
- }
-
-
- /**
- * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
- *
- */
- public static class DTFileImpl extends TFileImpl {
-
- @Override
- public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
- {
- FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
- long fileLength = getFileSize(bucketKey, fileName);
- super.setupConfig(fs.getConf());
- return new DTFileReader(fsdis, fileLength, fs.getConf());
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
deleted file mode 100644
index 0994666..0000000
--- a/tfile/TFileReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.netlet.util.Slice;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * TFileReader
- *
- * @since 2.0.0
- */
-public class TFileReader implements HDSFileReader
-{
-
- private final Reader reader;
- private final Scanner scanner;
- private final FSDataInputStream fsdis;
- private boolean closed = false;
-
- public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
- {
- this.fsdis = fsdis;
- reader = new Reader(fsdis, fileLength, conf);
- scanner = reader.createScanner();
- }
-
- /**
- * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
- * @see java.io.Closeable#close()
- */
- @Override
- public void close() throws IOException
- {
- closed = true;
- scanner.close();
- reader.close();
- fsdis.close();
- }
-
- @Override
- public void readFully(TreeMap<Slice, byte[]> data) throws IOException
- {
- scanner.rewind();
- for (; !scanner.atEnd(); scanner.advance()) {
- Entry en = scanner.entry();
- int klen = en.getKeyLength();
- int vlen = en.getValueLength();
- byte[] key = new byte[klen];
- byte[] value = new byte[vlen];
- en.getKey(key);
- en.getValue(value);
- data.put(new Slice(key, 0, key.length), value);
- }
-
- }
-
- @Override
- public void reset() throws IOException
- {
- scanner.rewind();
- }
-
- @Override
- public boolean seek(Slice key) throws IOException
- {
- try {
- return scanner.seekTo(key.buffer, key.offset, key.length);
- } catch (NullPointerException ex) {
- if (closed)
- throw new IOException("Stream was closed");
- else
- throw ex;
- }
- }
-
- @Override
- public boolean next(Slice key, Slice value) throws IOException
- {
- if (scanner.atEnd()) return false;
- Entry en = scanner.entry();
- byte[] rkey = new byte[en.getKeyLength()];
- byte[] rval = new byte[en.getValueLength()];
- en.getKey(rkey);
- en.getValue(rval);
-
- key.buffer = rkey;
- key.offset = 0;
- key.length = en.getKeyLength();
-
- value.buffer = rval;
- value.offset = 0;
- value.length = en.getValueLength();
-
- scanner.advance();
- return true;
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
deleted file mode 100644
index 549e1b8..0000000
--- a/tfile/TFileWriter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
-
-/**
- * TFileWriter
- *
- * @since 2.0.0
- */
-public final class TFileWriter implements HDSFileWriter
-{
- private Writer writer;
-
- private FSDataOutputStream fsdos;
-
- public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
- {
- this.fsdos = stream;
- writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
-
- }
-
- @Override
- public void close() throws IOException
- {
- writer.close();
- fsdos.close();
- }
-
- @Override
- public void append(byte[] key, byte[] value) throws IOException
- {
- writer.append(key, value);
- }
-
- @Override
- public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
-
-}
[7/7] incubator-apex-malhar git commit: Merge branch 'MLHR-1916' of
https://github.com/chandnisingh/incubator-apex-malhar into devel-3
Posted by hs...@apache.org.
Merge branch 'MLHR-1916' of https://github.com/chandnisingh/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/333a7073
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/333a7073
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/333a7073
Branch: refs/heads/devel-3
Commit: 333a7073308a7dc5beecb50b731d0904ecc1da24
Parents: e24c14c 7d2f474
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Nov 30 13:27:55 2015 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Nov 30 13:27:55 2015 -0800
----------------------------------------------------------------------
.../lib/fileaccess/DTFileReader.java | 112 ++++++++++++
.../datatorrent/lib/fileaccess/FileAccess.java | 129 ++++++++++++++
.../lib/fileaccess/FileAccessFSImpl.java | 130 ++++++++++++++
.../datatorrent/lib/fileaccess/TFileImpl.java | 178 +++++++++++++++++++
.../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++
.../datatorrent/lib/fileaccess/TFileWriter.java | 61 +++++++
pom.xml | 2 +-
7 files changed, 736 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/333a7073/pom.xml
----------------------------------------------------------------------