You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/24 08:05:45 UTC
[1/4] incubator-apex-malhar git commit: MLHR-1804 MLHR-1805 MLHR-1806
#resolve #comment Refactoring of file splitter and fix for kryo exception
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 ffc25e57c -> d710af9b1
MLHR-1804 MLHR-1805 MLHR-1806 #resolve #comment Refactoring of file splitter and fix for kryo exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ea202dae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ea202dae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ea202dae
Branch: refs/heads/devel-3
Commit: ea202daed756d8a0ca633ec14c0539451ddc581a
Parents: ffc25e5
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Tue Aug 25 12:08:21 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Sep 22 15:24:51 2015 -0700
----------------------------------------------------------------------
.../datatorrent/lib/io/block/BlockMetadata.java | 78 ++-
.../lib/io/fs/AbstractFileSplitter.java | 539 +++++++++++++++++
.../com/datatorrent/lib/io/fs/FileSplitter.java | 6 +-
.../datatorrent/lib/io/fs/FileSplitterBase.java | 142 +++++
.../lib/io/fs/FileSplitterInput.java | 594 +++++++++++++++++++
.../lib/io/fs/FileSplitterBaseTest.java | 264 +++++++++
.../lib/io/fs/FileSplitterInputTest.java | 486 +++++++++++++++
7 files changed, 2096 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 255a67f..1f47f8f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,10 @@
*/
package com.datatorrent.lib.io.block;
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
/**
* Represents the metadata of a block.
*
@@ -51,8 +55,9 @@ public interface BlockMetadata
{
private long offset;
private long length;
- private final boolean isLastBlock;
- private final long previousBlockId;
+ private boolean isLastBlock;
+ private long previousBlockId;
+ private long blockId;
@SuppressWarnings("unused")
protected AbstractBlockMetadata()
@@ -61,6 +66,7 @@ public interface BlockMetadata
length = -1;
isLastBlock = false;
previousBlockId = -1;
+ blockId = -1;
}
/**
@@ -71,12 +77,32 @@ public interface BlockMetadata
* @param isLastBlock true if this is the last block of file
* @param previousBlockId id of the previous block
*/
+ @Deprecated
public AbstractBlockMetadata(long offset, long length, boolean isLastBlock, long previousBlockId)
{
this.offset = offset;
this.length = length;
this.isLastBlock = isLastBlock;
this.previousBlockId = previousBlockId;
+ this.blockId = -1;
+ }
+
+ /**
+ * Constructs Block metadata
+ *
+ * @param blockId block id
+ * @param offset offset of the file in the block
+ * @param length length of the file in the block
+ * @param isLastBlock true if this is the last block of file
+ * @param previousBlockId id of the previous block
+ */
+ public AbstractBlockMetadata(long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
+ {
+ this.blockId = blockId;
+ this.offset = offset;
+ this.length = length;
+ this.isLastBlock = isLastBlock;
+ this.previousBlockId = previousBlockId;
}
@Override
@@ -89,14 +115,14 @@ public interface BlockMetadata
return false;
}
- AbstractBlockMetadata that = (AbstractBlockMetadata) o;
+ AbstractBlockMetadata that = (AbstractBlockMetadata)o;
return getBlockId() == that.getBlockId();
}
@Override
public int hashCode()
{
- return (int) getBlockId();
+ return (int)getBlockId();
}
@Override
@@ -133,11 +159,37 @@ public interface BlockMetadata
return isLastBlock;
}
+ public void setLastBlock(boolean lastBlock)
+ {
+ this.isLastBlock = lastBlock;
+ }
+
@Override
public long getPreviousBlockId()
{
return previousBlockId;
}
+
+ /**
+ * Sets the previous block id.
+ *
+ * @param previousBlockId previous block id.
+ */
+ public void setPreviousBlockId(long previousBlockId)
+ {
+ this.previousBlockId = previousBlockId;
+ }
+
+ @Override
+ public long getBlockId()
+ {
+ return blockId;
+ }
+
+ public void setBlockId(long blockId)
+ {
+ this.blockId = blockId;
+ }
}
/**
@@ -146,31 +198,33 @@ public interface BlockMetadata
public static class FileBlockMetadata extends AbstractBlockMetadata
{
private final String filePath;
- private final long blockId;
protected FileBlockMetadata()
{
super();
filePath = null;
- blockId = -1;
}
public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
{
- super(offset, length, isLastBlock, previousBlockId);
+ super(blockId, offset, length, isLastBlock, previousBlockId);
this.filePath = filePath;
- this.blockId = blockId;
}
- @Override
- public long getBlockId()
+ public FileBlockMetadata(String filePath)
{
- return blockId;
+ this.filePath = filePath;
}
public String getFilePath()
{
return filePath;
}
+
+ public FileBlockMetadata newInstance(@NotNull String filePath)
+ {
+ Preconditions.checkNotNull(filePath);
+ return new FileBlockMetadata(filePath);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
new file mode 100644
index 0000000..b8513ee
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -0,0 +1,539 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.block.BlockMetadata;
+
+/**
+ * An abstract File Splitter.
+ */
+public abstract class AbstractFileSplitter extends BaseOperator
+{
+ protected Long blockSize;
+ private int sequenceNo;
+
+ /**
+ * This is a threshold on the no. of blocks emitted per window. A lot of blocks emitted
+ * per window can overwhelm the downstream operators. This setting helps to control that.
+ */
+ @Min(1)
+ protected int blocksThreshold;
+
+ protected transient long blockCount;
+
+ protected BlockMetadataIterator blockMetadataIterator;
+
+ protected transient int operatorId;
+ protected transient Context.OperatorContext context;
+ protected transient long currentWindowId;
+
+ @AutoMetric
+ protected int filesProcessed;
+
+ public final transient DefaultOutputPort<FileMetadata> filesMetadataOutput = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new DefaultOutputPort<>();
+
+ public AbstractFileSplitter()
+ {
+ blocksThreshold = Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ Preconditions.checkArgument(blockSize == null || blockSize > 0, "invalid block size");
+
+ operatorId = context.getId();
+ this.context = context;
+ currentWindowId = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID);
+ if (blockSize == null) {
+ blockSize = getDefaultBlockSize();
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ filesProcessed = 0;
+ blockCount = 0;
+ currentWindowId = windowId;
+ }
+
+ protected void process()
+ {
+ if (blockMetadataIterator != null && blockCount < blocksThreshold) {
+ emitBlockMetadata();
+ }
+
+ FileInfo fileInfo;
+ while (blockCount < blocksThreshold && (fileInfo = getFileInfo()) != null) {
+ if (!processFileInfo(fileInfo)) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * @return {@link FileInfo}
+ */
+ protected abstract FileInfo getFileInfo();
+
+ /**
+ * @param fileInfo file info
+ * @return true if blocks threshold is reached; false otherwise
+ */
+ protected boolean processFileInfo(FileInfo fileInfo)
+ {
+ try {
+ FileMetadata fileMetadata = buildFileMetadata(fileInfo);
+ filesMetadataOutput.emit(fileMetadata);
+ filesProcessed++;
+ if (!fileMetadata.isDirectory()) {
+ blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize);
+ if (!emitBlockMetadata()) {
+ //block threshold reached
+ return false;
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException("creating metadata", e);
+ }
+ }
+
+ /**
+ * @return true if all the blocks were emitted; false otherwise
+ */
+ protected boolean emitBlockMetadata()
+ {
+ while (blockMetadataIterator.hasNext()) {
+ if (blockCount++ < blocksThreshold) {
+ this.blocksMetadataOutput.emit(blockMetadataIterator.next());
+ } else {
+ return false;
+ }
+ }
+ blockMetadataIterator = null;
+ return true;
+ }
+
+ /**
+ * Builds block metadata
+ *
+ * @param pos offset of the block
+ * @param lengthOfFileInBlock length of the block in file
+ * @param blockNumber block number
+ * @param fileMetadata file metadata
+ * @param isLast last block of the file
+ * @return
+ */
+ protected BlockMetadata.FileBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber,
+ FileMetadata fileMetadata, boolean isLast)
+ {
+ BlockMetadata.FileBlockMetadata fileBlockMetadata = createBlockMetadata(fileMetadata);
+ fileBlockMetadata.setBlockId(fileMetadata.getBlockIds()[blockNumber - 1]);
+ fileBlockMetadata.setOffset(pos);
+ fileBlockMetadata.setLength(lengthOfFileInBlock);
+ fileBlockMetadata.setLastBlock(isLast);
+ fileBlockMetadata.setPreviousBlockId(blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]);
+
+ return fileBlockMetadata;
+ }
+
+ /**
+ * Can be overridden for creating block metadata of a type that extends {@link BlockMetadata.FileBlockMetadata}
+ */
+ protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
+ {
+ return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath());
+ }
+
+ /**
+ * Creates file-metadata and populates no. of blocks in the metadata.
+ *
+ * @param fileInfo file information
+ * @return file-metadata
+ * @throws IOException
+ */
+ protected FileMetadata buildFileMetadata(FileInfo fileInfo) throws IOException
+ {
+ LOG.debug("file {}", fileInfo.getFilePath());
+ FileMetadata fileMetadata = createFileMetadata(fileInfo);
+ Path path = new Path(fileInfo.getFilePath());
+
+ fileMetadata.setFileName(path.getName());
+
+ FileStatus status = getFileStatus(path);
+ fileMetadata.setDirectory(status.isDirectory());
+ fileMetadata.setFileLength(status.getLen());
+
+ if (!status.isDirectory()) {
+ int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
+ if (fileMetadata.getDataOffset() >= status.getLen()) {
+ noOfBlocks = 0;
+ }
+ fileMetadata.setNumberOfBlocks(noOfBlocks);
+ populateBlockIds(fileMetadata);
+ }
+ return fileMetadata;
+ }
+
+ /**
+ * This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata}
+ *
+ * @param fileInfo file information
+ * @return file-metadata
+ */
+ protected FileMetadata createFileMetadata(FileInfo fileInfo)
+ {
+ return new FileMetadata(fileInfo.getFilePath());
+ }
+
+ protected void populateBlockIds(FileMetadata fileMetadata)
+ {
+ // block ids are 32 bits of operatorId | 32 bits of sequence number
+ long[] blockIds = new long[fileMetadata.getNumberOfBlocks()];
+ long longLeftSide = ((long)operatorId) << 32;
+ for (int i = 0; i < fileMetadata.getNumberOfBlocks(); i++) {
+ blockIds[i] = longLeftSide | sequenceNo++ & 0xFFFFFFFFL;
+ }
+ fileMetadata.setBlockIds(blockIds);
+ }
+
+ /**
+ * Get default block size which is used when the user hasn't specified block size.
+ *
+ * @return default block size.
+ */
+ protected abstract long getDefaultBlockSize();
+
+ /**
+ * Get status of a file.
+ *
+ * @param path path of a file
+ * @return file status
+ */
+ protected abstract FileStatus getFileStatus(Path path) throws IOException;
+
+ public void setBlockSize(Long blockSize)
+ {
+ this.blockSize = blockSize;
+ }
+
+ public Long getBlockSize()
+ {
+ return blockSize;
+ }
+
+ public void setBlocksThreshold(int threshold)
+ {
+ this.blocksThreshold = threshold;
+ }
+
+ public int getBlocksThreshold()
+ {
+ return blocksThreshold;
+ }
+
+ /**
+ * An {@link Iterator} for Block-Metadatas of a file.
+ */
+ protected static class BlockMetadataIterator implements Iterator<BlockMetadata.FileBlockMetadata>
+ {
+ private final FileMetadata fileMetadata;
+ private final long blockSize;
+
+ private long pos;
+ private int blockNumber;
+
+ private final transient AbstractFileSplitter splitter;
+
+ protected BlockMetadataIterator()
+ {
+ //for kryo
+ fileMetadata = null;
+ blockSize = -1;
+ splitter = null;
+ }
+
+ protected BlockMetadataIterator(AbstractFileSplitter splitter, FileMetadata fileMetadata, long blockSize)
+ {
+ this.splitter = splitter;
+ this.fileMetadata = fileMetadata;
+ this.blockSize = blockSize;
+ this.pos = fileMetadata.getDataOffset();
+ this.blockNumber = 0;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return pos < fileMetadata.getFileLength();
+ }
+
+ @SuppressWarnings("StatementWithEmptyBody")
+ @Override
+ public BlockMetadata.FileBlockMetadata next()
+ {
+ long length;
+ while ((length = blockSize * ++blockNumber) <= pos) {
+ }
+ boolean isLast = length >= fileMetadata.getFileLength();
+ long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : length;
+ BlockMetadata.FileBlockMetadata fileBlock = splitter.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
+ pos = lengthOfFileInBlock;
+ return fileBlock;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException("remove not supported");
+ }
+ }
+
+ /**
+ * Represents the file metadata - file path, name, no. of blocks, etc.
+ */
+ public static class FileMetadata
+ {
+ @NotNull
+ private String filePath;
+ private String fileName;
+ private int numberOfBlocks;
+ private long dataOffset;
+ private long fileLength;
+ private long discoverTime;
+ private long[] blockIds;
+ private boolean isDirectory;
+
+ @SuppressWarnings("unused")
+ protected FileMetadata()
+ {
+ //for kryo
+ filePath = null;
+ discoverTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Constructs file metadata
+ *
+ * @param filePath file path
+ */
+ public FileMetadata(@NotNull String filePath)
+ {
+ this.filePath = filePath;
+ discoverTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Returns the total number of blocks.
+ */
+ public int getNumberOfBlocks()
+ {
+ return numberOfBlocks;
+ }
+
+ /**
+ * Sets the total number of blocks.
+ */
+ public void setNumberOfBlocks(int numberOfBlocks)
+ {
+ this.numberOfBlocks = numberOfBlocks;
+ }
+
+ /**
+ * Returns the file name.
+ */
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ /**
+ * Sets the file name.
+ */
+ public void setFileName(String fileName)
+ {
+ this.fileName = fileName;
+ }
+
+ /**
+ * Sets the file path.
+ */
+ public void setFilePath(String filePath)
+ {
+ this.filePath = filePath;
+ }
+
+ /**
+ * Returns the file path.
+ */
+ public String getFilePath()
+ {
+ return filePath;
+ }
+
+ /**
+ * Returns the data offset.
+ */
+ public long getDataOffset()
+ {
+ return dataOffset;
+ }
+
+ /**
+ * Sets the data offset.
+ */
+ public void setDataOffset(long offset)
+ {
+ this.dataOffset = offset;
+ }
+
+ /**
+ * Returns the file length.
+ */
+ public long getFileLength()
+ {
+ return fileLength;
+ }
+
+ /**
+ * Sets the file length.
+ */
+ public void setFileLength(long fileLength)
+ {
+ this.fileLength = fileLength;
+ }
+
+ /**
+ * Returns the file discover time.
+ */
+ public long getDiscoverTime()
+ {
+ return discoverTime;
+ }
+
+ /**
+ * Sets the discover time.
+ */
+ public void setDiscoverTime(long discoverTime)
+ {
+ this.discoverTime = discoverTime;
+ }
+
+ /**
+ * Returns the block ids associated with the file.
+ */
+ public long[] getBlockIds()
+ {
+ return blockIds;
+ }
+
+ /**
+ * Sets the blocks ids of the file.
+ */
+ public void setBlockIds(long[] blockIds)
+ {
+ this.blockIds = blockIds;
+ }
+
+ /**
+ * Sets whether the file metadata is a directory.
+ */
+ public void setDirectory(boolean isDirectory)
+ {
+ this.isDirectory = isDirectory;
+ }
+
+ /**
+ * @return true if it is a directory; false otherwise.
+ */
+ public boolean isDirectory()
+ {
+ return isDirectory;
+ }
+ }
+
+ /**
+ * A class that encapsulates file path.
+ */
+ public static class FileInfo
+ {
+ protected final String directoryPath;
+ protected final String relativeFilePath;
+
+ protected FileInfo()
+ {
+ directoryPath = null;
+ relativeFilePath = null;
+ }
+
+ public FileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath)
+ {
+ this.directoryPath = directoryPath;
+ this.relativeFilePath = relativeFilePath;
+ }
+
+ /**
+ * @return directory path
+ */
+ public String getDirectoryPath()
+ {
+ return directoryPath;
+ }
+
+ /**
+ * @return path relative to directory
+ */
+ public String getRelativeFilePath()
+ {
+ return relativeFilePath;
+ }
+
+ /**
+ * @return full path of the file
+ */
+ public String getFilePath()
+ {
+ if (directoryPath == null) {
+ return relativeFilePath;
+ }
+ return new Path(directoryPath, relativeFilePath).toUri().getPath();
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSplitter.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 58554c3..874d486 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -59,14 +59,15 @@ import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
* The operator emits block metadata and file metadata.<br/>
*
* The file system/directory space should be different for different partitions of file splitter.
- * The scanning of
*
+ * @deprecated use {@link FileSplitterInput}. This splitter has issues with recovery and fixing that breaks backward compatibility.
* @displayName File Splitter
* @category Input
* @tags file
* @since 2.0.0
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
+@Deprecated
public class FileSplitter implements InputOperator, Operator.CheckpointListener
{
protected Long blockSize;
@@ -454,6 +455,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
/**
* Represents the file metadata - file path, name, no. of blocks, etc.
*/
+ @Deprecated
public static class FileMetadata
{
@NotNull
@@ -614,6 +616,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
}
}
+ @Deprecated
public static class TimeBasedDirectoryScanner implements Component<Context.OperatorContext>, Runnable
{
private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
@@ -939,6 +942,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
/**
* A class that represents the file discovered by time-based scanner.
*/
+ @Deprecated
protected static class FileInfo
{
protected final String directoryPath;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java
new file mode 100644
index 0000000..005377c
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * A file splitter that receives its input from an upstream operator.
+ */
+public class FileSplitterBase extends AbstractFileSplitter implements Operator.IdleTimeHandler
+{
+ @NotNull
+ protected String file;
+ protected transient FileSystem fs;
+
+ protected final LinkedList<FileInfo> fileInfos;
+ protected transient int sleepTimeMillis;
+
+ public FileSplitterBase()
+ {
+ fileInfos = new LinkedList<>();
+ }
+
+ public final transient DefaultInputPort<FileInfo> input = new DefaultInputPort<FileInfo>()
+ {
+ @Override
+ public void process(FileInfo fileInfo)
+ {
+ fileInfos.add(fileInfo);
+ FileSplitterBase.this.process();
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ try {
+ fs = getFSInstance();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ super.setup(context);
+ }
+
+ protected FileSystem getFSInstance() throws IOException
+ {
+ return FileSystem.newInstance(new Path(file).toUri(), new Configuration());
+ }
+
+ @Override
+ protected FileInfo getFileInfo()
+ {
+ if (fileInfos.size() > 0) {
+ return fileInfos.remove();
+ }
+ return null;
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (blockCount < blocksThreshold && (blockMetadataIterator != null || fileInfos.size() > 0)) {
+ process();
+ } else {
+ /* nothing to do here, so sleep for a while to avoid busy loop */
+ try {
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.teardown();
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected long getDefaultBlockSize()
+ {
+ return fs.getDefaultBlockSize(new Path(file));
+ }
+
+ @Override
+ protected FileStatus getFileStatus(Path path) throws IOException
+ {
+ return fs.getFileStatus(path);
+ }
+
+ /**
+ * File path from which the File System is inferred.
+ *
+ * @param file files
+ */
+ public void setFile(@NotNull String file)
+ {
+ this.file = Preconditions.checkNotNull(file, "file path");
+ }
+
+ /**
+ * @return file path from which the File System is inferred.
+ */
+ public String getFile()
+ {
+ return file;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
new file mode 100644
index 0000000..a381be5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -0,0 +1,594 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Input operator that scans a directory for files and splits a file into blocks.<br/>
+ * The operator emits block metadata and file metadata.<br/>
+ *
+ * The file system/directory space should be different for different partitions of file splitter.
+ * The scanning of
+ *
+ * @displayName File Splitter
+ * @category Input
+ * @tags file
+ * @since 2.0.0
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener
+{
+ @NotNull
+ private IdempotentStorageManager idempotentStorageManager;
+ @NotNull
+ protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
+
+ @NotNull
+ private TimeBasedDirectoryScanner scanner;
+ @NotNull
+ private Map<String, Long> referenceTimes;
+
+ private transient long sleepMillis;
+
+ public FileSplitterInput()
+ {
+ super();
+ currentWindowRecoveryState = Lists.newLinkedList();
+ idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ referenceTimes = Maps.newHashMap();
+ scanner = new TimeBasedDirectoryScanner();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ scanner.setup(context);
+ idempotentStorageManager.setup(context);
+ super.setup(context);
+
+ long largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow();
+ if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
+ scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ replay(windowId);
+ }
+ }
+
+ protected void replay(long windowId)
+ {
+ try {
+ @SuppressWarnings("unchecked")
+ LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)idempotentStorageManager.load(operatorId, windowId);
+ if (recoveredData == null) {
+ //This could happen when there are multiple physical instances and one of them is ahead in processing windows.
+ return;
+ }
+ if (blockMetadataIterator != null) {
+ emitBlockMetadata();
+ }
+ for (ScannedFileInfo info : recoveredData) {
+ updateReferenceTimes(info);
+ FileMetadata fileMetadata = buildFileMetadata(info);
+ filesMetadataOutput.emit(fileMetadata);
+
+ blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize);
+ if (!emitBlockMetadata()) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("replay", e);
+ }
+ if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+ scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ return;
+ }
+
+ Throwable throwable;
+ if ((throwable = scanner.atomicThrowable.get()) != null) {
+ DTThrowable.rethrow(throwable);
+ }
+ if (blockMetadataIterator == null && scanner.discoveredFiles.isEmpty()) {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("waiting for work", e);
+ }
+ }
+ process();
+ }
+
+ @Override
+ protected FileInfo getFileInfo()
+ {
+ return scanner.pollFile();
+ }
+
+ @Override
+ protected boolean processFileInfo(FileInfo fileInfo)
+ {
+ ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
+ currentWindowRecoveryState.add(scannedFileInfo);
+ updateReferenceTimes(scannedFileInfo);
+ return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan;
+ }
+
+ protected void updateReferenceTimes(ScannedFileInfo fileInfo)
+ {
+ referenceTimes.put(fileInfo.getFilePath(), fileInfo.modifiedTime);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
+ try {
+ idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+ } catch (IOException e) {
+ throw new RuntimeException("saving recovery", e);
+ }
+ }
+ currentWindowRecoveryState.clear();
+ }
+
+ @Override
+ protected long getDefaultBlockSize()
+ {
+ return scanner.fs.getDefaultBlockSize(new Path(scanner.files.iterator().next()));
+ }
+
+ @Override
+ protected FileStatus getFileStatus(Path path) throws IOException
+ {
+ return scanner.fs.getFileStatus(path);
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+ }
+
+ @Override
+ public void committed(long l)
+ {
+ try {
+ idempotentStorageManager.deleteUpTo(operatorId, l);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ scanner.teardown();
+ }
+
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+ {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return this.idempotentStorageManager;
+ }
+
+ public void setScanner(TimeBasedDirectoryScanner scanner)
+ {
+ this.scanner = scanner;
+ }
+
+ public TimeBasedDirectoryScanner getScanner()
+ {
+ return this.scanner;
+ }
+
+ public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
+ {
+ private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+
+ private boolean recursive;
+
+ private transient volatile boolean trigger;
+
+ @NotNull
+ @Size(min = 1)
+ private final Set<String> files;
+
+ @Min(0)
+ private long scanIntervalMillis;
+ private String filePatternRegularExp;
+
+ protected transient long lastScanMillis;
+ protected transient FileSystem fs;
+ protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
+ protected final transient ExecutorService scanService;
+ protected final transient AtomicReference<Throwable> atomicThrowable;
+
+ private transient volatile boolean running;
+ protected final transient HashSet<String> ignoredFiles;
+ protected transient Pattern regex;
+ protected transient long sleepMillis;
+ protected transient Map<String, Long> referenceTimes;
+
+ private transient ScannedFileInfo lastScannedInfo;
+
+ public TimeBasedDirectoryScanner()
+ {
+ recursive = true;
+ scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
+ files = Sets.newLinkedHashSet();
+ scanService = Executors.newSingleThreadExecutor();
+ discoveredFiles = new LinkedBlockingDeque<>();
+ atomicThrowable = new AtomicReference<>();
+ ignoredFiles = Sets.newHashSet();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ if (filePatternRegularExp != null) {
+ regex = Pattern.compile(filePatternRegularExp);
+ }
+ try {
+ fs = getFSInstance();
+ } catch (IOException e) {
+ throw new RuntimeException("opening fs", e);
+ }
+ }
+
+ protected void startScanning(Map<String, Long> referenceTimes)
+ {
+ this.referenceTimes = Preconditions.checkNotNull(referenceTimes);
+ scanService.submit(this);
+ }
+
+ @Override
+ public void teardown()
+ {
+ running = false;
+ scanService.shutdownNow();
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new RuntimeException("closing fs", e);
+ }
+ }
+
+ protected FileSystem getFSInstance() throws IOException
+ {
+ return FileSystem.newInstance(new Path(files.iterator().next()).toUri(), new Configuration());
+ }
+
+ @Override
+ public void run()
+ {
+ running = true;
+ try {
+ while (running) {
+ if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) &&
+ (lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) {
+ trigger = false;
+ for (String afile : files) {
+ scan(new Path(afile), null);
+ }
+ scanComplete();
+ } else {
+ Thread.sleep(sleepMillis);
+ }
+ }
+ } catch (Throwable throwable) {
+ LOG.error("service", throwable);
+ running = false;
+ atomicThrowable.set(throwable);
+ DTThrowable.rethrow(throwable);
+ }
+ }
+
+ /**
+ * Operations that need to be done once a scan is complete.
+ */
+ protected void scanComplete()
+ {
+ LOG.debug("scan complete {}", lastScanMillis);
+ ScannedFileInfo fileInfo = discoveredFiles.peekLast();
+ if (fileInfo != null) {
+ fileInfo.lastFileOfScan = true;
+ lastScannedInfo = fileInfo;
+ }
+ lastScanMillis = System.currentTimeMillis();
+ }
+
+ protected void scan(@NotNull Path filePath, Path rootPath)
+ {
+ try {
+ FileStatus parentStatus = fs.getFileStatus(filePath);
+ String parentPathStr = filePath.toUri().getPath();
+
+ LOG.debug("scan {}", parentPathStr);
+
+ FileStatus[] childStatuses = fs.listStatus(filePath);
+ for (FileStatus status : childStatuses) {
+ Path childPath = status.getPath();
+ ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath);
+
+ if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) {
+ continue;
+ }
+
+ if (status.isDirectory()) {
+ if (recursive) {
+ scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
+ }
+ }
+
+ String childPathStr = childPath.toUri().getPath();
+ if (ignoredFiles.contains(childPathStr)) {
+ continue;
+ }
+ if (acceptFile(childPathStr)) {
+ LOG.debug("found {}", childPathStr);
+ discoveredFiles.add(info);
+ } else {
+ // don't look at it again
+ ignoredFiles.add(childPathStr);
+ }
+ }
+ } catch (FileNotFoundException fnf) {
+ LOG.warn("Failed to list directory {}", filePath, fnf);
+ } catch (IOException e) {
+ throw new RuntimeException("listing files", e);
+ }
+ }
+
+ protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
+ {
+ ScannedFileInfo info;
+ if (rootPath == null) {
+ info = parentStatus.isDirectory() ?
+ new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) :
+ new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime());
+ } else {
+ URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
+ info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(), parentStatus.getModificationTime());
+ }
+ return info;
+ }
+
+ /**
+ * Skips file/directory based on their modification time.<br/>
+ *
+ * @param path file path
+ * @param modificationTime modification time
+ * @param lastModificationTime last cached directory modification time
+ * @return true to skip; false otherwise.
+ * @throws IOException
+ */
+ protected static boolean skipFile(@SuppressWarnings("unused") @NotNull Path path, @NotNull Long modificationTime,
+ Long lastModificationTime) throws IOException
+ {
+ return (!(lastModificationTime == null || modificationTime > lastModificationTime));
+ }
+
+ /**
+ * Accepts file which match a regular pattern.
+ *
+ * @param filePathStr file path
+ * @return true if the path matches the pattern; false otherwise;
+ */
+ protected boolean acceptFile(String filePathStr)
+ {
+ if (regex != null) {
+ Matcher matcher = regex.matcher(filePathStr);
+ if (!matcher.matches()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public FileInfo pollFile()
+ {
+ return discoveredFiles.poll();
+ }
+
+ /**
+ * Gets the regular expression for file names to split.
+ *
+ * @return regular expression
+ */
+ public String getFilePatternRegularExp()
+ {
+ return filePatternRegularExp;
+ }
+
+ /**
+ * Only files with names matching the given java regular expression are split.
+ *
+ * @param filePatternRegexp regular expression
+ */
+ public void setFilePatternRegularExp(String filePatternRegexp)
+ {
+ this.filePatternRegularExp = filePatternRegexp;
+ }
+
+ /**
+ * A comma separated list of directories to scan. If the path is not fully qualified the default
+ * file system is used. A fully qualified path can be provided to scan directories in other filesystems.
+ *
+ * @param files files
+ */
+ public void setFiles(String files)
+ {
+ Iterables.addAll(this.files, Splitter.on(",").omitEmptyStrings().split(files));
+ }
+
+ /**
+ * Gets the files to be scanned.
+ *
+ * @return files to be scanned.
+ */
+ public String getFiles()
+ {
+ return Joiner.on(",").join(this.files);
+ }
+
+ /**
+ * True if recursive; false otherwise.
+ *
+ * @param recursive true if recursive; false otherwise.
+ */
+ public void setRecursive(boolean recursive)
+ {
+ this.recursive = recursive;
+ }
+
+ /**
+ * Sets whether scan will be recursive.
+ *
+ * @return true if recursive; false otherwise.
+ */
+ public boolean isRecursive()
+ {
+ return this.recursive;
+ }
+
+ /**
+ * Sets the trigger which will initiate scan.
+ *
+ * @param trigger
+ */
+ public void setTrigger(boolean trigger)
+ {
+ this.trigger = trigger;
+ }
+
+ /**
+ * The trigger which will initiate scan.
+ *
+ * @return trigger
+ */
+ public boolean isTrigger()
+ {
+ return this.trigger;
+ }
+
+ /**
+ * Returns the frequency with which new files are scanned for in milliseconds.
+ *
+ * @return The scan interval in milliseconds.
+ */
+ public long getScanIntervalMillis()
+ {
+ return scanIntervalMillis;
+ }
+
+ /**
+ * Sets the frequency with which new files are scanned for in milliseconds.
+ *
+ * @param scanIntervalMillis The scan interval in milliseconds.
+ */
+ public void setScanIntervalMillis(long scanIntervalMillis)
+ {
+ this.scanIntervalMillis = scanIntervalMillis;
+ }
+ }
+
+ /**
+ * File info created for files discovered by scanner
+ */
+ public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo
+ {
+ protected final long modifiedTime;
+ private transient boolean lastFileOfScan;
+
+ private ScannedFileInfo()
+ {
+ super();
+ modifiedTime = -1;
+ }
+
+ public ScannedFileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath, long modifiedTime)
+ {
+ super(directoryPath, relativeFilePath);
+ this.modifiedTime = modifiedTime;
+ }
+
+ protected boolean isLastFileOfScan()
+ {
+ return lastFileOfScan;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
new file mode 100644
index 0000000..61ccfad
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -0,0 +1,264 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.*;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link FileSplitterBase}
+ */
+public class FileSplitterBaseTest
+{
+ @ClassRule
+ public static FileSplitterInputTest.TestClassMeta classTestMeta = new FileSplitterInputTest.TestClassMeta();
+
+ static class BastTestMeta extends TestWatcher
+ {
+ public String dataDirectory;
+
+ FileSplitterBase fileSplitter;
+ CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink;
+ CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
+ Set<String> filePaths;
+ Context.OperatorContext context;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ this.dataDirectory = "target/" + className + "/" + methodName;
+ try {
+ filePaths = FileSplitterInputTest.createData(this.dataDirectory);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ fileSplitter = new FileSplitterBase();
+ fileSplitter.setFile(this.dataDirectory);
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(Context.OperatorContext.SPIN_MILLIS, 500);
+
+ context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes);
+ fileSplitter.setup(context);
+
+ fileMetadataSink = new CollectorTestSink<>();
+ TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink);
+
+ blockMetadataSink = new CollectorTestSink<>();
+ TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ this.fileSplitter.teardown();
+ }
+ }
+
+ @Rule
+ public BastTestMeta baseTestMeta = new BastTestMeta();
+
+ @Test
+ public void testFileMetadata() throws InterruptedException
+ {
+ baseTestMeta.fileSplitter.beginWindow(1);
+ for (String filePath : baseTestMeta.filePaths) {
+ baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+ }
+ baseTestMeta.fileSplitter.endWindow();
+ Assert.assertEquals("File metadata", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
+ for (Object fileMetadata : baseTestMeta.fileMetadataSink.collectedTuples) {
+ FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
+ Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath()));
+ Assert.assertNotNull("name: ", metadata.getFileName());
+ }
+
+ baseTestMeta.fileMetadataSink.collectedTuples.clear();
+ }
+
+ @Test
+ public void testBlockMetadataNoSplit() throws InterruptedException
+ {
+ baseTestMeta.fileSplitter.beginWindow(1);
+ for (String filePath : baseTestMeta.filePaths) {
+ baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+ }
+ Assert.assertEquals("Blocks", 12, baseTestMeta.blockMetadataSink.collectedTuples.size());
+ for (Object blockMetadata : baseTestMeta.blockMetadataSink.collectedTuples) {
+ BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+ Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath()));
+ }
+ }
+
+ @Test
+ public void testBlockMetadataWithSplit() throws InterruptedException
+ {
+ baseTestMeta.fileSplitter.setBlockSize(2L);
+ baseTestMeta.fileSplitter.beginWindow(1);
+ for (String filePath : baseTestMeta.filePaths) {
+ baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+ }
+ Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
+
+ int noOfBlocks = 0;
+ for (int i = 0; i < 12; i++) {
+ FileSplitterInput.FileMetadata fm = baseTestMeta.fileMetadataSink.collectedTuples.get(i);
+ File testFile = new File(baseTestMeta.dataDirectory, fm.getFileName());
+ noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+ }
+ Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testBlocksThreshold() throws InterruptedException
+ {
+ int noOfBlocks = 0;
+ for (int i = 0; i < 12; i++) {
+ File testFile = new File(baseTestMeta.dataDirectory, "file" + i + ".txt");
+ noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+ }
+
+ baseTestMeta.fileSplitter.setBlockSize(2L);
+ baseTestMeta.fileSplitter.setBlocksThreshold(10);
+ baseTestMeta.fileSplitter.beginWindow(1);
+
+ for (String filePath : baseTestMeta.filePaths) {
+ baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+ }
+ baseTestMeta.fileSplitter.endWindow();
+
+ Assert.assertEquals("Blocks", 10, baseTestMeta.blockMetadataSink.collectedTuples.size());
+
+ for (int window = 2; window < 8; window++) {
+ baseTestMeta.fileSplitter.beginWindow(window);
+ baseTestMeta.fileSplitter.handleIdleTime();
+ baseTestMeta.fileSplitter.endWindow();
+ }
+
+ Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testSplitterInApp() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ SplitterApp app = new SplitterApp();
+ lma.prepareDAG(app, new Configuration());
+ lma.cloneDAG(); // check serialization
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ app.receiver.latch.await();
+ Assert.assertEquals("no. of metadata", 12, app.receiver.count);
+ FileUtils.deleteQuietly(new File("target/SplitterInApp"));
+ }
+
+ @ApplicationAnnotation(name = "TestApp")
+ class SplitterApp implements StreamingApplication
+ {
+ MockReceiver receiver;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ dag.setAttribute(DAG.APPLICATION_PATH, "target/SplitterInApp");
+ MockFileInput fileInput = dag.addOperator("Input", new MockFileInput());
+ fileInput.filePaths = baseTestMeta.filePaths;
+
+ FileSplitterBase splitter = dag.addOperator("Splitter", new FileSplitterBase());
+ splitter.setFile(baseTestMeta.dataDirectory);
+
+ receiver = dag.addOperator("Receiver", new MockReceiver());
+
+ dag.addStream("files", fileInput.files, splitter.input);
+ dag.addStream("file-metadata", splitter.filesMetadataOutput, receiver.fileMetadata);
+ }
+ }
+
+ static class MockReceiver extends BaseOperator implements StatsListener
+ {
+ @AutoMetric
+ int count;
+
+ transient CountDownLatch latch = new CountDownLatch(1);
+ public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new DefaultInputPort<FileSplitterInput.FileMetadata>()
+ {
+ @Override
+ public void process(FileSplitterInput.FileMetadata fileMetadata)
+ {
+ count++;
+ LOG.debug("count {}", count);
+ }
+ };
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+ count = (Integer)operatorStats.metrics.get("count");
+ if (count == 12) {
+ latch.countDown();
+ }
+ return null;
+ }
+ }
+
+ static class MockFileInput extends BaseOperator implements InputOperator
+ {
+
+ public final transient DefaultOutputPort<FileSplitterInput.FileInfo> files = new DefaultOutputPort<>();
+
+ protected Set<String> filePaths;
+
+ protected boolean done;
+
+ @Override
+ public void emitTuples()
+ {
+ if (!done) {
+ done = true;
+ for (String file : filePaths) {
+ files.emit(new FileSplitterInput.FileInfo(null, file));
+ }
+ }
+ }
+ }
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(FileSplitterBaseTest.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
new file mode 100644
index 0000000..8dfea7a
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -0,0 +1,486 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link FileSplitterInput}
+ */
+public class FileSplitterInputTest
+{
+
+ public static class TestClassMeta extends TestWatcher
+ {
+ @Override
+ protected void finished(Description description)
+ {
+ try {
+ FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static Set<String> createData(String dataDirectory) throws IOException
+ {
+ Set<String> filePaths = Sets.newHashSet();
+ FileContext.getLocalFSFileContext().delete(new Path(new File(dataDirectory).getAbsolutePath()), true);
+ HashSet<String> allLines = Sets.newHashSet();
+ for (int file = 0; file < 12; file++) {
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 2; line++) {
+ lines.add("f" + file + "l" + line);
+ }
+ allLines.addAll(lines);
+ File created = new File(dataDirectory, "file" + file + ".txt");
+ filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString());
+ FileUtils.write(created, StringUtils.join(lines, '\n'));
+ }
+ return filePaths;
+ }
+
+ public static class TestMeta extends TestWatcher
+ {
+ public String dataDirectory;
+
+ FileSplitterInput fileSplitterInput;
+ CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink;
+ CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
+ Set<String> filePaths;
+ Context.OperatorContext context;
+ MockScanner scanner;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ this.dataDirectory = "target/" + className + "/" + methodName + "/data";
+ try {
+ filePaths = createData(this.dataDirectory);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ fileSplitterInput = new FileSplitterInput();
+ scanner = new MockScanner();
+ fileSplitterInput.setScanner(scanner);
+ fileSplitterInput.getScanner().setScanIntervalMillis(500);
+ fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
+ fileSplitterInput.getScanner().setFiles(dataDirectory);
+ fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
+
+ context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes);
+ fileSplitterInput.setup(context);
+
+ fileMetadataSink = new CollectorTestSink<>();
+ TestUtils.setSink(fileSplitterInput.filesMetadataOutput, fileMetadataSink);
+
+ blockMetadataSink = new CollectorTestSink<>();
+ TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, blockMetadataSink);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ this.fileSplitterInput.teardown();
+ }
+ }
+
+ @ClassRule
+ public static TestClassMeta classTestMeta = new TestClassMeta();
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testFileMetadata() throws InterruptedException
+ {
+ testMeta.fileSplitterInput.beginWindow(1);
+ testMeta.scanner.semaphore.acquire();
+
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+ Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size());
+ for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
+ FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
+ Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
+ Assert.assertNotNull("name: ", metadata.getFileName());
+ }
+
+ testMeta.fileMetadataSink.collectedTuples.clear();
+ }
+
+ @Test
+ public void testBlockMetadataNoSplit() throws InterruptedException
+ {
+ testMeta.fileSplitterInput.beginWindow(1);
+ testMeta.scanner.semaphore.acquire();
+
+ testMeta.fileSplitterInput.emitTuples();
+ Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
+ for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
+ BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+ Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
+ }
+ }
+
+ @Test
+ public void testBlockMetadataWithSplit() throws InterruptedException
+ {
+ testMeta.fileSplitterInput.setBlockSize(2L);
+ testMeta.fileSplitterInput.beginWindow(1);
+ testMeta.scanner.semaphore.acquire();
+
+ testMeta.fileSplitterInput.emitTuples();
+ Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
+
+ int noOfBlocks = 0;
+ for (int i = 0; i < 12; i++) {
+ FileSplitterInput.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i);
+ File testFile = new File(testMeta.dataDirectory, fm.getFileName());
+ noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+ }
+ Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testIdempotency() throws InterruptedException
+ {
+ IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager =
+ new IdempotentStorageManager.FSIdempotentStorageManager();
+ testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+
+ testMeta.fileSplitterInput.setup(testMeta.context);
+ //will emit window 1 from data directory
+ testFileMetadata();
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ testMeta.fileSplitterInput.setup(testMeta.context);
+ testMeta.fileSplitterInput.beginWindow(1);
+ Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
+ for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
+ BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+ Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
+ }
+ }
+
+ @Test
+ public void testTimeScan() throws InterruptedException, IOException, TimeoutException
+ {
+ testFileMetadata();
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ Thread.sleep(1000);
+ //added a new relativeFilePath
+ File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 2; line++) {
+ lines.add("f13" + "l" + line);
+ }
+ FileUtils.write(f13, StringUtils.join(lines, '\n'));
+
+ //window 2
+ testMeta.fileSplitterInput.beginWindow(2);
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testTrigger() throws InterruptedException, IOException, TimeoutException
+ {
+ testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
+ testFileMetadata();
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ Thread.sleep(1000);
+ //added a new relativeFilePath
+ File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 2; line++) {
+ lines.add("f13" + "l" + line);
+ }
+ FileUtils.write(f13, StringUtils.join(lines, '\n'));
+ testMeta.fileSplitterInput.getScanner().setTrigger(true);
+
+ //window 2
+ testMeta.fileSplitterInput.beginWindow(2);
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testBlocksThreshold() throws InterruptedException
+ {
+ int noOfBlocks = 0;
+ for (int i = 0; i < 12; i++) {
+ File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
+ noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+ }
+
+ testMeta.fileSplitterInput.setBlockSize(2L);
+ testMeta.fileSplitterInput.setBlocksThreshold(10);
+ testMeta.fileSplitterInput.beginWindow(1);
+
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("Blocks", 10, testMeta.blockMetadataSink.collectedTuples.size());
+
+ for (int window = 2; window < 8; window++) {
+ testMeta.fileSplitterInput.beginWindow(window);
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+ }
+
+ Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testIdempotencyWithBlocksThreshold() throws InterruptedException
+ {
+ IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+ testMeta.fileSplitterInput.setBlocksThreshold(10);
+ testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+ testMeta.fileSplitterInput.setup(testMeta.context);
+
+ testBlocksThreshold();
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ testMeta.fileSplitterInput.setup(testMeta.context);
+ for (int i = 1; i < 8; i++) {
+ testMeta.fileSplitterInput.beginWindow(i);
+ }
+ Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Blocks", 62, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testFirstWindowAfterRecovery() throws IOException, InterruptedException
+ {
+ testIdempotencyWithBlocksThreshold();
+ Thread.sleep(1000);
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 2; line < 4; line++) {
+ lines.add("f13" + "l" + line);
+ }
+ File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
+
+ FileUtils.writeLines(f13, lines, true);
+
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ testMeta.fileSplitterInput.beginWindow(8);
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Blocks", 6, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testRecoveryOfPartialFile() throws InterruptedException
+ {
+ IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+ testMeta.fileSplitterInput.setBlockSize(2L);
+ testMeta.fileSplitterInput.setBlocksThreshold(2);
+ testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+
+ Kryo kryo = new Kryo();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Output loutput = new Output(bos);
+ kryo.writeObject(loutput, testMeta.fileSplitterInput);
+ loutput.close();
+
+ testMeta.fileSplitterInput.setup(testMeta.context);
+
+ testMeta.fileSplitterInput.beginWindow(1);
+
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted.
+ Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ testMeta.fileSplitterInput.teardown();
+
+ //there was a failure and the operator was re-deployed
+ Input lInput = new Input(bos.toByteArray());
+ testMeta.fileSplitterInput = kryo.readObject(lInput, testMeta.fileSplitterInput.getClass());
+ lInput.close();
+ TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink);
+ TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink);
+
+ testMeta.fileSplitterInput.setup(testMeta.context);
+ testMeta.fileSplitterInput.beginWindow(1);
+
+ Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+ testMeta.fileSplitterInput.beginWindow(2);
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("Blocks", 4, testMeta.blockMetadataSink.collectedTuples.size());
+
+ String file1 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
+
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ testMeta.fileSplitterInput.beginWindow(3);
+ ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("New file", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+ String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
+
+ Assert.assertTrue("Block file name 0", testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
+ Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
+ }
+
+ @Test
+ public void testRecursive() throws InterruptedException, IOException
+ {
+ testMeta.fileSplitterInput.getScanner().regex = null;
+ testFileMetadata();
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ Thread.sleep(1000);
+ //added a new relativeFilePath
+ File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt");
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 2; line++) {
+ lines.add("f13" + "l" + line);
+ }
+ FileUtils.write(f13, StringUtils.join(lines, '\n'));
+
+ //window 2
+ testMeta.fileSplitterInput.beginWindow(2);
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("window 2: files", 2, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+ }
+
+ @Test
+ public void testSingleFile() throws InterruptedException, IOException
+ {
+ testMeta.fileSplitterInput.teardown();
+ testMeta.fileSplitterInput.setScanner(new MockScanner());
+ testMeta.fileSplitterInput.getScanner().regex = null;
+ testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + "/file1.txt");
+
+ testMeta.fileSplitterInput.setup(testMeta.context);
+ testMeta.fileSplitterInput.beginWindow(1);
+ ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+ Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(),
+ testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
+ }
+
+ private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
+ {
+ transient Semaphore semaphore;
+
+ private MockScanner()
+ {
+ super();
+ this.semaphore = new Semaphore(0);
+ }
+
+ @Override
+ protected void scanComplete()
+ {
+ super.scanComplete();
+ if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) {
+ semaphore.release();
+ LOG.debug("discovered {}", discoveredFiles.size());
+ }
+
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class);
+}
[4/4] incubator-apex-malhar git commit: Merge branch
'feature-splitter' of github.com:chandnisingh/incubator-apex-malhar into
devel-3
Posted by pr...@apache.org.
Merge branch 'feature-splitter' of github.com:chandnisingh/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d710af9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d710af9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d710af9b
Branch: refs/heads/devel-3
Commit: d710af9b1a5a84de8e179c783bb96c5405af95de
Parents: ffc25e5 a3f1b3c
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Sep 23 22:50:55 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Sep 23 22:50:55 2015 -0700
----------------------------------------------------------------------
.../datatorrent/lib/io/block/BlockMetadata.java | 78 ++-
.../lib/io/fs/AbstractFileSplitter.java | 539 +++++++++++++++++
.../com/datatorrent/lib/io/fs/FileSplitter.java | 6 +-
.../datatorrent/lib/io/fs/FileSplitterBase.java | 142 +++++
.../lib/io/fs/FileSplitterInput.java | 598 +++++++++++++++++++
.../lib/io/fs/FileSplitterBaseTest.java | 264 ++++++++
.../lib/io/fs/FileSplitterInputTest.java | 484 +++++++++++++++
7 files changed, 2098 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-apex-malhar git commit: Removed delimiter
Posted by pr...@apache.org.
Removed delimiter
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a3f1b3ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a3f1b3ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a3f1b3ca
Branch: refs/heads/devel-3
Commit: a3f1b3cac46e1821f0fcbdb0adc276a56cb447ef
Parents: f99696a
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Sep 23 18:18:26 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 23 18:18:26 2015 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/lib/io/fs/FileSplitterInput.java | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a3f1b3ca/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 2560191..92cb97a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -176,9 +176,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected boolean processFileInfo(FileInfo fileInfo)
{
ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
- if (scannedFileInfo == TimeBasedDirectoryScanner.DELIMITER) {
- return false;
- }
currentWindowRecoveryState.add(scannedFileInfo);
updateReferenceTimes(scannedFileInfo);
return super.processFileInfo(fileInfo);
@@ -258,7 +255,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
{
private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
- private static ScannedFileInfo DELIMITER = new ScannedFileInfo();
private boolean recursive;
@@ -368,9 +364,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected void scanIterationComplete()
{
LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration);
- if (numDiscoveredPerIteration > 0) {
- discoveredFiles.add(DELIMITER);
- }
lastScanMillis = System.currentTimeMillis();
}
[2/4] incubator-apex-malhar git commit: discoveredFiles getting empty
between scan and scan iteration completion
Posted by pr...@apache.org.
discoveredFiles getting empty between scan and scan iteration completion
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f99696af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f99696af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f99696af
Branch: refs/heads/devel-3
Commit: f99696aff2e46cd50d3060480d4d3018ac65e3e1
Parents: ea202da
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 22 23:01:36 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 23 17:48:15 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/FileSplitterInput.java | 43 ++++++++++++--------
.../lib/io/fs/FileSplitterInputTest.java | 8 ++--
2 files changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index a381be5..2560191 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -176,9 +176,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected boolean processFileInfo(FileInfo fileInfo)
{
ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
+ if (scannedFileInfo == TimeBasedDirectoryScanner.DELIMITER) {
+ return false;
+ }
currentWindowRecoveryState.add(scannedFileInfo);
updateReferenceTimes(scannedFileInfo);
- return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan;
+ return super.processFileInfo(fileInfo);
}
protected void updateReferenceTimes(ScannedFileInfo fileInfo)
@@ -255,6 +258,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
{
private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+ private static ScannedFileInfo DELIMITER = new ScannedFileInfo();
private boolean recursive;
@@ -281,6 +285,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected transient Map<String, Long> referenceTimes;
private transient ScannedFileInfo lastScannedInfo;
+ private transient int numDiscoveredPerIteration;
public TimeBasedDirectoryScanner()
{
@@ -339,10 +344,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) &&
(lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) {
trigger = false;
+ lastScannedInfo = null;
+ numDiscoveredPerIteration = 0;
for (String afile : files) {
scan(new Path(afile), null);
}
- scanComplete();
+ scanIterationComplete();
} else {
Thread.sleep(sleepMillis);
}
@@ -358,13 +365,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
/**
* Operations that need to be done once a scan is complete.
*/
- protected void scanComplete()
+ protected void scanIterationComplete()
{
- LOG.debug("scan complete {}", lastScanMillis);
- ScannedFileInfo fileInfo = discoveredFiles.peekLast();
- if (fileInfo != null) {
- fileInfo.lastFileOfScan = true;
- lastScannedInfo = fileInfo;
+ LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration);
+ if (numDiscoveredPerIteration > 0) {
+ discoveredFiles.add(DELIMITER);
}
lastScanMillis = System.currentTimeMillis();
}
@@ -398,7 +403,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
if (acceptFile(childPathStr)) {
LOG.debug("found {}", childPathStr);
- discoveredFiles.add(info);
+ processDiscoveredFile(info);
} else {
// don't look at it again
ignoredFiles.add(childPathStr);
@@ -411,6 +416,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
}
+ protected void processDiscoveredFile(ScannedFileInfo info)
+ {
+ numDiscoveredPerIteration++;
+ lastScannedInfo = info;
+ discoveredFiles.add(info);
+ }
+
protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
{
ScannedFileInfo info;
@@ -462,6 +474,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
return discoveredFiles.poll();
}
+ protected int getNumDiscoveredPerIteration()
+ {
+ return numDiscoveredPerIteration;
+ }
+
/**
* Gets the regular expression for file names to split.
*
@@ -570,9 +587,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo
{
protected final long modifiedTime;
- private transient boolean lastFileOfScan;
- private ScannedFileInfo()
+ protected ScannedFileInfo()
{
super();
modifiedTime = -1;
@@ -583,11 +599,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
super(directoryPath, relativeFilePath);
this.modifiedTime = modifiedTime;
}
-
- protected boolean isLastFileOfScan()
- {
- return lastFileOfScan;
- }
}
private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 8dfea7a..1e2a25c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -471,14 +471,12 @@ public class FileSplitterInputTest
}
@Override
- protected void scanComplete()
+ protected void scanIterationComplete()
{
- super.scanComplete();
- if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) {
+ if (getNumDiscoveredPerIteration() > 0) {
semaphore.release();
- LOG.debug("discovered {}", discoveredFiles.size());
}
-
+ super.scanIterationComplete();
}
}