You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/30 22:28:31 UTC
[6/7] incubator-apex-malhar git commit: MLHR-1916 #resolve #comment
Added back the FileAccess api and its implementations
MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7d2f4749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7d2f4749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7d2f4749
Branch: refs/heads/devel-3
Commit: 7d2f47491498c6b1c550f70e626dd76ba1db393e
Parents: c787461
Author: MalharJenkins <je...@datatorrent.com>
Authored: Mon Nov 23 21:14:41 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 22:11:18 2015 -0800
----------------------------------------------------------------------
HDHTFileAccess.java | 124 -------------
HDHTFileAccessFSImpl.java | 127 -------------
.../lib/fileaccess/DTFileReader.java | 112 ++++++++++++
.../datatorrent/lib/fileaccess/FileAccess.java | 129 ++++++++++++++
.../lib/fileaccess/FileAccessFSImpl.java | 130 ++++++++++++++
.../datatorrent/lib/fileaccess/TFileImpl.java | 178 +++++++++++++++++++
.../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++
.../datatorrent/lib/fileaccess/TFileWriter.java | 61 +++++++
pom.xml | 2 +-
tfile/DTFileReader.java | 111 ------------
tfile/TFileImpl.java | 177 ------------------
tfile/TFileReader.java | 125 -------------
tfile/TFileWriter.java | 60 -------
13 files changed, 736 insertions(+), 725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
deleted file mode 100644
index 266ba75..0000000
--- a/HDHTFileAccess.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Abstraction for file system and format interaction.
- *
- * @since 2.0.0
- */
-public interface HDHTFileAccess extends Closeable
-{
- void init();
-
- DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
- DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
-
- /**
- * Atomic file rename.
- * @param bucketKey
- * @param oldName
- * @param newName
- * @throws IOException
- */
- void rename(long bucketKey, String oldName, String newName) throws IOException;
- void delete(long bucketKey, String fileName) throws IOException;
-
- long getFileSize(long bucketKey, String s) throws IOException;
-
- /**
- * HDHT Data File Format Reader
- */
- interface HDSFileReader extends Closeable
- {
- /**
- * Read the entire contents of the underlying file into a TreeMap structure
- * @param data
- * @throws IOException
- */
- //Move to
- // void readFully(TreeMap<Slice, Slice> data) throws IOException;
- void readFully(TreeMap<Slice, byte[]> data) throws IOException;
-
- /**
- * Repositions the pointer to the beginning of the underlying file.
- * @throws IOException
- */
- void reset() throws IOException;
-
- /**
- * Searches for a matching key, and positions the pointer before the start of the key.
- * @param key Byte array representing the key
- * @throws IOException
- * @return true if a given key is found
- */
- boolean seek(Slice key) throws IOException;
-
- /**
- * Reads next available key/value pair starting from the current pointer position
- * into Slice objects and advances pointer to next key. If pointer is at the end
- * of the file, false is returned, and Slice objects remains unmodified.
- *
- * @param key Empty slice object
- * @param value Empty slice object
- * @return True if key/value were successfully read, false otherwise
- * @throws IOException
- */
- boolean next(Slice key, Slice value) throws IOException;
-
- }
-
- /**
- * HDHT Data File Format Writer
- */
- interface HDSFileWriter extends Closeable {
- /**
- * Appends key/value pair to the underlying file.
- * @param key
- * @param value
- * @throws IOException
- */
- void append(byte[] key, byte[] value) throws IOException;
-
- /**
- * Returns number of bytes written to the underlying stream.
- * @return The bytes written.
- * @throws IOException
- */
- long getBytesWritten() throws IOException;
- }
-
- /**
- * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
- * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
- */
- public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
-
- /**
- * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
- * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
- */
- public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
deleted file mode 100644
index 13dd0ad..0000000
--- a/HDHTFileAccessFSImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.IOException;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Hadoop file system backed store.
- *
- * @since 2.0.0
- */
-abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
-{
- @NotNull
- private String basePath;
- protected transient FileSystem fs;
-
- public HDHTFileAccessFSImpl()
- {
- }
-
- public String getBasePath()
- {
- return basePath;
- }
-
- public void setBasePath(String path)
- {
- this.basePath = path;
- }
-
- protected Path getFilePath(long bucketKey, String fileName) {
- return new Path(getBucketPath(bucketKey), fileName);
- }
-
- protected Path getBucketPath(long bucketKey)
- {
- return new Path(basePath, Long.toString(bucketKey));
- }
-
- @Override
- public long getFileSize(long bucketKey, String fileName) throws IOException {
- return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
- }
-
- @Override
- public void close() throws IOException
- {
- fs.close();
- }
-
- @Override
- public void init()
- {
- if (fs == null) {
- Path dataFilePath = new Path(basePath);
- try {
- fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- }
- }
-
- @Override
- public void delete(long bucketKey, String fileName) throws IOException
- {
- fs.delete(getFilePath(bucketKey, fileName), true);
- }
-
- @Override
- public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
- {
- Path path = getFilePath(bucketKey, fileName);
- return fs.create(path, true);
- }
-
- @Override
- public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
- {
- return fs.open(getFilePath(bucketKey, fileName));
- }
-
- @Override
- public void rename(long bucketKey, String fromName, String toName) throws IOException
- {
- FileContext fc = FileContext.getFileContext(fs.getUri());
- Path bucketPath = getBucketPath(bucketKey);
- // file context requires absolute path
- if (!bucketPath.isAbsolute()) {
- bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
- }
- fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
- }
-
- @Override
- public String toString()
- {
- return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
new file mode 100644
index 0000000..cb97520
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
+ *
+ *
+ * @since 2.0.0
+ */
+public class DTFileReader implements FileAccess.FileReader
+{
+ private final Reader reader;
+ private final Scanner scanner;
+ private final FSDataInputStream fsdis;
+
+ public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+ {
+ this.fsdis = fsdis;
+ reader = new Reader(fsdis, fileLength, conf);
+ scanner = reader.createScanner();
+ }
+
+ /**
+ * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException
+ {
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+
+ @Override
+ public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+ {
+ scanner.rewind();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ Entry en = scanner.entry();
+ Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+ byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
+ data.put(key, value);
+ }
+
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ scanner.rewind();
+ }
+
+ @Override
+ public boolean seek(Slice key) throws IOException
+ {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ }
+
+ @Override
+ public boolean next(Slice key, Slice value) throws IOException
+ {
+ if (scanner.atEnd()) return false;
+ Entry en = scanner.entry();
+
+ key.buffer = en.getBlockBuffer();
+ key.offset = en.getKeyOffset();
+ key.length = en.getKeyLength();
+
+ value.buffer = en.getBlockBuffer();
+ value.offset = en.getValueOffset();
+ value.length = en.getValueLength();
+
+ scanner.advance();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
new file mode 100644
index 0000000..4b7f6e5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ *
+ * @since 2.0.0
+ */
+public interface FileAccess extends Closeable
+{
+ void init();
+
+ DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+
+ DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+ /**
+ * Atomic file rename.
+ * @param bucketKey
+ * @param oldName
+ * @param newName
+ * @throws IOException
+ */
+ void rename(long bucketKey, String oldName, String newName) throws IOException;
+ void delete(long bucketKey, String fileName) throws IOException;
+
+ long getFileSize(long bucketKey, String s) throws IOException;
+
+ /**
+ * Data File Format Reader
+ */
+ interface FileReader extends Closeable
+ {
+ /**
+ * Read the entire contents of the underlying file into a TreeMap structure
+ * @param data
+ * @throws IOException
+ */
+ //Move to
+ // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+ void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+ /**
+ * Repositions the pointer to the beginning of the underlying file.
+ * @throws IOException
+ */
+ void reset() throws IOException;
+
+ /**
+ * Searches for a matching key, and positions the pointer before the start of the key.
+ * @param key Byte array representing the key
+ * @throws IOException
+ * @return true if a given key is found
+ */
+ boolean seek(Slice key) throws IOException;
+
+ /**
+ * Reads next available key/value pair starting from the current pointer position
+ * into Slice objects and advances pointer to next key. If pointer is at the end
+ * of the file, false is returned, and Slice objects remains unmodified.
+ *
+ * @param key Empty slice object
+ * @param value Empty slice object
+ * @return True if key/value were successfully read, false otherwise
+ * @throws IOException
+ */
+ boolean next(Slice key, Slice value) throws IOException;
+
+ }
+
+ /**
+ * Data File Format Writer
+ */
+ interface FileWriter extends Closeable
+ {
+ /**
+ * Appends key/value pair to the underlying file.
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ void append(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Returns number of bytes written to the underlying stream.
+ * @return The bytes written.
+ * @throws IOException
+ */
+ long getBytesWritten() throws IOException;
+ }
+
+ /**
+ * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs.
+ * work just based on InputStream), construction of the reader is part of the file system abstraction itself.
+ */
+ public FileReader getReader(long bucketKey, String fileName) throws IOException;
+
+ /**
+ * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs.
+ * work just based on OutputStream), construction of the writer is part of the file system abstraction itself.
+ */
+ public FileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
new file mode 100644
index 0000000..80a201a
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ *
+ * @since 2.0.0
+ */
+public abstract class FileAccessFSImpl implements FileAccess
+{
+ @NotNull
+ private String basePath;
+ protected transient FileSystem fs;
+
+ public FileAccessFSImpl()
+ {
+ }
+
+ public String getBasePath()
+ {
+ return basePath;
+ }
+
+ public void setBasePath(String path)
+ {
+ this.basePath = path;
+ }
+
+ protected Path getFilePath(long bucketKey, String fileName) {
+ return new Path(getBucketPath(bucketKey), fileName);
+ }
+
+ protected Path getBucketPath(long bucketKey)
+ {
+ return new Path(basePath, Long.toString(bucketKey));
+ }
+
+ @Override
+ public long getFileSize(long bucketKey, String fileName) throws IOException {
+ return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ fs.close();
+ }
+
+ @Override
+ public void init()
+ {
+ if (fs == null) {
+ Path dataFilePath = new Path(basePath);
+ try {
+ fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void delete(long bucketKey, String fileName) throws IOException
+ {
+ fs.delete(getFilePath(bucketKey, fileName), true);
+ }
+
+ @Override
+ public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+ {
+ Path path = getFilePath(bucketKey, fileName);
+ return fs.create(path, true);
+ }
+
+ @Override
+ public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+ {
+ return fs.open(getFilePath(bucketKey, fileName));
+ }
+
+ @Override
+ public void rename(long bucketKey, String fromName, String toName) throws IOException
+ {
+ FileContext fc = FileContext.getFileContext(fs.getUri());
+ Path bucketPath = getBucketPath(bucketKey);
+ // file context requires absolute path
+ if (!bucketPath.isAbsolute()) {
+ bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+ }
+ fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
new file mode 100644
index 0000000..5526832
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * A TFile wrapper with FileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+public abstract class TFileImpl extends FileAccessFSImpl
+{
+ private int minBlockSize = 64 * 1024;
+
+ private String compressName = TFile.COMPRESSION_NONE;
+
+ private String comparator = "memcmp";
+
+ private int chunkSize = 1024 * 1024;
+
+ private int inputBufferSize = 256 * 1024;
+
+ private int outputBufferSize = 256 * 1024;
+
+
+ private void setupConfig(Configuration conf)
+ {
+ conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+ conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+ conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+ }
+
+
+ @Override
+ public FileWriter getWriter(long bucketKey, String fileName) throws IOException
+ {
+ FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+ setupConfig(fs.getConf());
+ return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+ }
+
+ public int getMinBlockSize()
+ {
+ return minBlockSize;
+ }
+
+
+ public void setMinBlockSize(int minBlockSize)
+ {
+ this.minBlockSize = minBlockSize;
+ }
+
+
+ public String getCompressName()
+ {
+ return compressName;
+ }
+
+
+ public void setCompressName(String compressName)
+ {
+ this.compressName = compressName;
+ }
+
+
+ public String getComparator()
+ {
+ return comparator;
+ }
+
+
+ public void setComparator(String comparator)
+ {
+ this.comparator = comparator;
+ }
+
+
+ public int getChunkSize()
+ {
+ return chunkSize;
+ }
+
+
+ public void setChunkSize(int chunkSize)
+ {
+ this.chunkSize = chunkSize;
+ }
+
+
+ public int getInputBufferSize()
+ {
+ return inputBufferSize;
+ }
+
+
+ public void setInputBufferSize(int inputBufferSize)
+ {
+ this.inputBufferSize = inputBufferSize;
+ }
+
+
+ public int getOutputBufferSize()
+ {
+ return outputBufferSize;
+ }
+
+
+ public void setOutputBufferSize(int outputBufferSize)
+ {
+ this.outputBufferSize = outputBufferSize;
+ }
+
+ /**
+ * Return {@link TFile} {@link Reader}
+ *
+ */
+ public static class DefaultTFileImpl extends TFileImpl{
+
+ @Override
+ public FileReader getReader(long bucketKey, String fileName) throws IOException
+ {
+ FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
+ long fileLength = getFileSize(bucketKey, fileName);
+ super.setupConfig(fs.getConf());
+ return new TFileReader(fsdis, fileLength, fs.getConf());
+ }
+
+ }
+
+
+ /**
+ * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+ *
+ */
+ public static class DTFileImpl extends TFileImpl {
+
+ @Override
+ public FileReader getReader(long bucketKey, String fileName) throws IOException
+ {
+ FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
+ long fileLength = getFileSize(bucketKey, fileName);
+ super.setupConfig(fs.getConf());
+ return new DTFileReader(fsdis, fileLength, fs.getConf());
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
new file mode 100644
index 0000000..8426c3f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * TFileReader
+ *
+ * @since 2.0.0
+ */
+public class TFileReader implements FileAccess.FileReader
+{
+
+ private final Reader reader;
+ private final Scanner scanner;
+ private final FSDataInputStream fsdis;
+ private boolean closed = false;
+
+ public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
+ {
+ this.fsdis = fsdis;
+ reader = new Reader(fsdis, fileLength, conf);
+ scanner = reader.createScanner();
+ }
+
+ /**
+ * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException
+ {
+ closed = true;
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+
+ @Override
+ public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+ {
+ scanner.rewind();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ Entry en = scanner.entry();
+ int klen = en.getKeyLength();
+ int vlen = en.getValueLength();
+ byte[] key = new byte[klen];
+ byte[] value = new byte[vlen];
+ en.getKey(key);
+ en.getValue(value);
+ data.put(new Slice(key, 0, key.length), value);
+ }
+
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ scanner.rewind();
+ }
+
+ @Override
+ public boolean seek(Slice key) throws IOException
+ {
+ try {
+ return scanner.seekTo(key.buffer, key.offset, key.length);
+ } catch (NullPointerException ex) {
+ if (closed)
+ throw new IOException("Stream was closed");
+ else
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean next(Slice key, Slice value) throws IOException
+ {
+ if (scanner.atEnd()) return false;
+ Entry en = scanner.entry();
+ byte[] rkey = new byte[en.getKeyLength()];
+ byte[] rval = new byte[en.getValueLength()];
+ en.getKey(rkey);
+ en.getValue(rval);
+
+ key.buffer = rkey;
+ key.offset = 0;
+ key.length = en.getKeyLength();
+
+ value.buffer = rval;
+ value.offset = 0;
+ value.length = en.getValueLength();
+
+ scanner.advance();
+ return true;
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
new file mode 100644
index 0000000..b362987
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * TFileWriter
+ *
+ * @since 2.0.0
+ */
+public final class TFileWriter implements FileAccess.FileWriter
+{
+ private Writer writer;
+
+ private FSDataOutputStream fsdos;
+
+ public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
+ {
+ this.fsdos = stream;
+ writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ writer.close();
+ fsdos.close();
+ }
+
+ @Override
+ public void append(byte[] key, byte[] value) throws IOException
+ {
+ writer.append(key, value);
+ }
+
+ @Override
+ public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92466ab..678540d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>8768</maxAllowedViolations>
+ <maxAllowedViolations>8789</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
deleted file mode 100644
index e61d475..0000000
--- a/tfile/DTFileReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
-import org.apache.hadoop.io.file.tfile.TFile;
-
-import com.datatorrent.netlet.util.Slice;
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * {@link DTFile} wrapper for HDSFileReader
- * <br>
- * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation
- * <br>
- * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter"
- *
- *
- * @since 2.0.0
- */
-public class DTFileReader implements HDSFileReader
-{
- private final Reader reader;
- private final Scanner scanner;
- private final FSDataInputStream fsdis;
-
- public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
- {
- this.fsdis = fsdis;
- reader = new Reader(fsdis, fileLength, conf);
- scanner = reader.createScanner();
- }
-
- /**
- * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
- * @see java.io.Closeable#close()
- */
- @Override
- public void close() throws IOException
- {
- scanner.close();
- reader.close();
- fsdis.close();
- }
-
- @Override
- public void readFully(TreeMap<Slice, byte[]> data) throws IOException
- {
- scanner.rewind();
- for (; !scanner.atEnd(); scanner.advance()) {
- Entry en = scanner.entry();
- Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
- byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength());
- data.put(key, value);
- }
-
- }
-
- @Override
- public void reset() throws IOException
- {
- scanner.rewind();
- }
-
- @Override
- public boolean seek(Slice key) throws IOException
- {
- return scanner.seekTo(key.buffer, key.offset, key.length);
- }
-
- @Override
- public boolean next(Slice key, Slice value) throws IOException
- {
- if (scanner.atEnd()) return false;
- Entry en = scanner.entry();
-
- key.buffer = en.getBlockBuffer();
- key.offset = en.getKeyOffset();
- key.length = en.getKeyLength();
-
- value.buffer = en.getBlockBuffer();
- value.offset = en.getValueOffset();
- value.length = en.getValueLength();
-
- scanner.advance();
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
deleted file mode 100644
index 5dc9464..0000000
--- a/tfile/TFileImpl.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
-
-/**
- * A TFile wrapper with HDHTFileAccess API
- * <ul>
- * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
- * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
- * </ul>
- *
- * @since 2.0.0
- */
-public abstract class TFileImpl extends HDHTFileAccessFSImpl
-{
- private int minBlockSize = 64 * 1024;
-
- private String compressName = TFile.COMPRESSION_NONE;
-
- private String comparator = "memcmp";
-
- private int chunkSize = 1024 * 1024;
-
- private int inputBufferSize = 256 * 1024;
-
- private int outputBufferSize = 256 * 1024;
-
-
- private void setupConfig(Configuration conf)
- {
- conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
- conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
- conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
- }
-
-
- @Override
- public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
- {
- FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
- setupConfig(fs.getConf());
- return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
- }
-
- public int getMinBlockSize()
- {
- return minBlockSize;
- }
-
-
- public void setMinBlockSize(int minBlockSize)
- {
- this.minBlockSize = minBlockSize;
- }
-
-
- public String getCompressName()
- {
- return compressName;
- }
-
-
- public void setCompressName(String compressName)
- {
- this.compressName = compressName;
- }
-
-
- public String getComparator()
- {
- return comparator;
- }
-
-
- public void setComparator(String comparator)
- {
- this.comparator = comparator;
- }
-
-
- public int getChunkSize()
- {
- return chunkSize;
- }
-
-
- public void setChunkSize(int chunkSize)
- {
- this.chunkSize = chunkSize;
- }
-
-
- public int getInputBufferSize()
- {
- return inputBufferSize;
- }
-
-
- public void setInputBufferSize(int inputBufferSize)
- {
- this.inputBufferSize = inputBufferSize;
- }
-
-
- public int getOutputBufferSize()
- {
- return outputBufferSize;
- }
-
-
- public void setOutputBufferSize(int outputBufferSize)
- {
- this.outputBufferSize = outputBufferSize;
- }
-
- /**
- * Return {@link TFile} {@link Reader}
- *
- */
- public static class DefaultTFileImpl extends TFileImpl{
-
- @Override
- public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
- {
- FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
- long fileLength = getFileSize(bucketKey, fileName);
- super.setupConfig(fs.getConf());
- return new TFileReader(fsdis, fileLength, fs.getConf());
- }
-
- }
-
-
- /**
- * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
- *
- */
- public static class DTFileImpl extends TFileImpl {
-
- @Override
- public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
- {
- FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
- long fileLength = getFileSize(bucketKey, fileName);
- super.setupConfig(fs.getConf());
- return new DTFileReader(fsdis, fileLength, fs.getConf());
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
deleted file mode 100644
index 0994666..0000000
--- a/tfile/TFileReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.netlet.util.Slice;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * TFileReader
- *
- * @since 2.0.0
- */
-public class TFileReader implements HDSFileReader
-{
-
- private final Reader reader;
- private final Scanner scanner;
- private final FSDataInputStream fsdis;
- private boolean closed = false;
-
- public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException
- {
- this.fsdis = fsdis;
- reader = new Reader(fsdis, fileLength, conf);
- scanner = reader.createScanner();
- }
-
- /**
- * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
- * @see java.io.Closeable#close()
- */
- @Override
- public void close() throws IOException
- {
- closed = true;
- scanner.close();
- reader.close();
- fsdis.close();
- }
-
- @Override
- public void readFully(TreeMap<Slice, byte[]> data) throws IOException
- {
- scanner.rewind();
- for (; !scanner.atEnd(); scanner.advance()) {
- Entry en = scanner.entry();
- int klen = en.getKeyLength();
- int vlen = en.getValueLength();
- byte[] key = new byte[klen];
- byte[] value = new byte[vlen];
- en.getKey(key);
- en.getValue(value);
- data.put(new Slice(key, 0, key.length), value);
- }
-
- }
-
- @Override
- public void reset() throws IOException
- {
- scanner.rewind();
- }
-
- @Override
- public boolean seek(Slice key) throws IOException
- {
- try {
- return scanner.seekTo(key.buffer, key.offset, key.length);
- } catch (NullPointerException ex) {
- if (closed)
- throw new IOException("Stream was closed");
- else
- throw ex;
- }
- }
-
- @Override
- public boolean next(Slice key, Slice value) throws IOException
- {
- if (scanner.atEnd()) return false;
- Entry en = scanner.entry();
- byte[] rkey = new byte[en.getKeyLength()];
- byte[] rval = new byte[en.getValueLength()];
- en.getKey(rkey);
- en.getValue(rval);
-
- key.buffer = rkey;
- key.offset = 0;
- key.length = en.getKeyLength();
-
- value.buffer = rval;
- value.offset = 0;
- value.length = en.getValueLength();
-
- scanner.advance();
- return true;
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
deleted file mode 100644
index 549e1b8..0000000
--- a/tfile/TFileWriter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
-
-/**
- * TFileWriter
- *
- * @since 2.0.0
- */
-public final class TFileWriter implements HDSFileWriter
-{
- private Writer writer;
-
- private FSDataOutputStream fsdos;
-
- public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException
- {
- this.fsdos = stream;
- writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
-
- }
-
- @Override
- public void close() throws IOException
- {
- writer.close();
- fsdos.close();
- }
-
- @Override
- public void append(byte[] key, byte[] value) throws IOException
- {
- writer.append(key, value);
- }
-
- @Override
- public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
-
-}