You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/19 06:10:56 UTC

apex-malhar git commit: APEXMALHAR-2263 change offset in AbstractFileInputOperator from int to long

Repository: apex-malhar
Updated Branches:
  refs/heads/master abb3900c9 -> 2f308aa21


APEXMALHAR-2263 change offset in AbstractFileInputOperator from int to long


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2f308aa2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2f308aa2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2f308aa2

Branch: refs/heads/master
Commit: 2f308aa21dacff082f854d583df6dff9d1ec30ec
Parents: abb3900
Author: Matt Zhang <ma...@gmail.com>
Authored: Fri Oct 14 16:06:04 2016 -0700
Committer: Matt Zhang <ma...@gmail.com>
Committed: Tue Oct 18 22:37:01 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    | 25 ++++++++++----------
 pom.xml                                         |  1 +
 2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f308aa2/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index f0e3fbb..9e80b4e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -96,6 +96,7 @@ import com.datatorrent.lib.util.KryoCloneUtils;
  * @param <T> The type of the object that this input operator reads.
  * @since 1.0.2
  */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractFileInputOperator<T>
     implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener
 {
@@ -106,7 +107,7 @@ public abstract class AbstractFileInputOperator<T>
   @NotNull
   protected DirectoryScanner scanner = new DirectoryScanner();
   protected int scanIntervalMillis = 5000;
-  protected int offset;
+  protected long offset;
   protected String currentFile;
   protected Set<String> processedFiles = new HashSet<String>();
   protected int emitBatchSize = 1000;
@@ -114,7 +115,7 @@ public abstract class AbstractFileInputOperator<T>
   protected int partitionCount = 1;
   private int retryCount = 0;
   private int maxRetryCount = 5;
-  protected transient int skipCount = 0;
+  protected transient long skipCount = 0;
   private transient OperatorContext context;
 
   private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
@@ -143,7 +144,7 @@ public abstract class AbstractFileInputOperator<T>
   protected static class FailedFile
   {
     String path;
-    int   offset;
+    long   offset;
     int    retryCount;
     long   lastFailedTime;
 
@@ -151,14 +152,14 @@ public abstract class AbstractFileInputOperator<T>
     @SuppressWarnings("unused")
     protected FailedFile() {}
 
-    protected FailedFile(String path, int offset)
+    protected FailedFile(String path, long offset)
     {
       this.path = path;
       this.offset = offset;
       this.retryCount = 0;
     }
 
-    protected FailedFile(String path, int offset, int retryCount)
+    protected FailedFile(String path, long offset, int retryCount)
     {
       this.path = path;
       this.offset = offset;
@@ -623,7 +624,7 @@ public abstract class AbstractFileInputOperator<T>
       try {
         if (currentFile != null && offset > 0) {
           //open file resets offset to 0 so this a way around it.
-          int tmpOffset = offset;
+          long tmpOffset = offset;
           if (fs.exists(new Path(currentFile))) {
             this.inputStream = openFile(new Path(currentFile));
             offset = tmpOffset;
@@ -651,7 +652,7 @@ public abstract class AbstractFileInputOperator<T>
       }
     }
     if (inputStream != null) {
-      int startOffset = offset;
+      long startOffset = offset;
       String file  = currentFile; //current file is reset to null when closed.
 
       try {
@@ -1130,8 +1131,8 @@ public abstract class AbstractFileInputOperator<T>
   protected static class RecoveryEntry
   {
     final String file;
-    final int startOffset;
-    final int endOffset;
+    final long startOffset;
+    final long endOffset;
 
     @SuppressWarnings("unused")
     private RecoveryEntry()
@@ -1141,7 +1142,7 @@ public abstract class AbstractFileInputOperator<T>
       endOffset = -1;
     }
 
-    RecoveryEntry(String file, int startOffset, int endOffset)
+    RecoveryEntry(String file, long startOffset, long endOffset)
     {
       this.file = Preconditions.checkNotNull(file, "file");
       this.startOffset = startOffset;
@@ -1174,8 +1175,8 @@ public abstract class AbstractFileInputOperator<T>
     public int hashCode()
     {
       int result = file.hashCode();
-      result = 31 * result + startOffset;
-      result = 31 * result + endOffset;
+      result = 31 * result + (int)(startOffset & 0xFFFFFFFF);
+      result = 31 * result + (int)(endOffset & 0xFFFFFFFF);
       return result;
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f308aa2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb8bd93..7a392d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
                 <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
                 <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
                 <exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude>
+                <exclude>com.datatorrent.lib.io.fs.AbstractFileInputOperator</exclude>
               </excludes>
             </parameter>
             <skip>${semver.plugin.skip}</skip>