You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/19 06:10:56 UTC
apex-malhar git commit: APEXMALHAR-2263 change offset in
AbstractFileInputOperator from int to long
Repository: apex-malhar
Updated Branches:
refs/heads/master abb3900c9 -> 2f308aa21
APEXMALHAR-2263 change offset in AbstractFileInputOperator from int to long
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2f308aa2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2f308aa2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2f308aa2
Branch: refs/heads/master
Commit: 2f308aa21dacff082f854d583df6dff9d1ec30ec
Parents: abb3900
Author: Matt Zhang <ma...@gmail.com>
Authored: Fri Oct 14 16:06:04 2016 -0700
Committer: Matt Zhang <ma...@gmail.com>
Committed: Tue Oct 18 22:37:01 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileInputOperator.java | 25 ++++++++++----------
pom.xml | 1 +
2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f308aa2/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index f0e3fbb..9e80b4e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -96,6 +96,7 @@ import com.datatorrent.lib.util.KryoCloneUtils;
* @param <T> The type of the object that this input operator reads.
* @since 1.0.2
*/
+@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractFileInputOperator<T>
implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener
{
@@ -106,7 +107,7 @@ public abstract class AbstractFileInputOperator<T>
@NotNull
protected DirectoryScanner scanner = new DirectoryScanner();
protected int scanIntervalMillis = 5000;
- protected int offset;
+ protected long offset;
protected String currentFile;
protected Set<String> processedFiles = new HashSet<String>();
protected int emitBatchSize = 1000;
@@ -114,7 +115,7 @@ public abstract class AbstractFileInputOperator<T>
protected int partitionCount = 1;
private int retryCount = 0;
private int maxRetryCount = 5;
- protected transient int skipCount = 0;
+ protected transient long skipCount = 0;
private transient OperatorContext context;
private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
@@ -143,7 +144,7 @@ public abstract class AbstractFileInputOperator<T>
protected static class FailedFile
{
String path;
- int offset;
+ long offset;
int retryCount;
long lastFailedTime;
@@ -151,14 +152,14 @@ public abstract class AbstractFileInputOperator<T>
@SuppressWarnings("unused")
protected FailedFile() {}
- protected FailedFile(String path, int offset)
+ protected FailedFile(String path, long offset)
{
this.path = path;
this.offset = offset;
this.retryCount = 0;
}
- protected FailedFile(String path, int offset, int retryCount)
+ protected FailedFile(String path, long offset, int retryCount)
{
this.path = path;
this.offset = offset;
@@ -623,7 +624,7 @@ public abstract class AbstractFileInputOperator<T>
try {
if (currentFile != null && offset > 0) {
//open file resets offset to 0 so this a way around it.
- int tmpOffset = offset;
+ long tmpOffset = offset;
if (fs.exists(new Path(currentFile))) {
this.inputStream = openFile(new Path(currentFile));
offset = tmpOffset;
@@ -651,7 +652,7 @@ public abstract class AbstractFileInputOperator<T>
}
}
if (inputStream != null) {
- int startOffset = offset;
+ long startOffset = offset;
String file = currentFile; //current file is reset to null when closed.
try {
@@ -1130,8 +1131,8 @@ public abstract class AbstractFileInputOperator<T>
protected static class RecoveryEntry
{
final String file;
- final int startOffset;
- final int endOffset;
+ final long startOffset;
+ final long endOffset;
@SuppressWarnings("unused")
private RecoveryEntry()
@@ -1141,7 +1142,7 @@ public abstract class AbstractFileInputOperator<T>
endOffset = -1;
}
- RecoveryEntry(String file, int startOffset, int endOffset)
+ RecoveryEntry(String file, long startOffset, long endOffset)
{
this.file = Preconditions.checkNotNull(file, "file");
this.startOffset = startOffset;
@@ -1174,8 +1175,8 @@ public abstract class AbstractFileInputOperator<T>
public int hashCode()
{
int result = file.hashCode();
- result = 31 * result + startOffset;
- result = 31 * result + endOffset;
+ result = 31 * result + (int)(startOffset & 0xFFFFFFFF);
+ result = 31 * result + (int)(endOffset & 0xFFFFFFFF);
return result;
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f308aa2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb8bd93..7a392d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
<exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
<exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude>
+ <exclude>com.datatorrent.lib.io.fs.AbstractFileInputOperator</exclude>
</excludes>
</parameter>
<skip>${semver.plugin.skip}</skip>