You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/25 16:03:37 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2254, APEXMALHAR-2269, APEXMALHAR-2270 Fix bugs on replay tuple skipping and idempotency issues.

Repository: apex-malhar
Updated Branches:
  refs/heads/master c89e63621 -> 999efccba


APEXMALHAR-2254, APEXMALHAR-2269, APEXMALHAR-2270 Fix bugs on replay tuple skipping and idempotency issues.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0abb698e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0abb698e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0abb698e

Branch: refs/heads/master
Commit: 0abb698e5c784c8813943065acccda1993f8cb5d
Parents: c92ca15
Author: Matt Zhang <ma...@datatorrent.com>
Authored: Mon Oct 10 09:49:47 2016 -0700
Committer: Matt Zhang <ma...@gmail.com>
Committed: Sun Oct 23 19:25:51 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    |  26 ++-
 .../io/fs/AbstractFileInputOperatorTest.java    | 201 +++++++++++++++++++
 2 files changed, 223 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0abb698e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index d4ee03b..0f3cc48 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -592,17 +592,27 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
                 pendingFiles.remove(recoveryEntry.file);
               }
               inputStream = retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.startOffset));
+
+              while (--skipCount >= 0) {
+                readEntity();
+              }
               while (offset < recoveryEntry.endOffset) {
                 T line = readEntity();
                 offset++;
                 emit(line);
               }
+              if (recoveryEntry.fileClosed) {
+                closeFile(inputStream);
+              }
             } else {
               while (offset < recoveryEntry.endOffset) {
                 T line = readEntity();
                 offset++;
                 emit(line);
               }
+              if (recoveryEntry.fileClosed) {
+                closeFile(inputStream);
+              }
             }
           }
         }
@@ -654,6 +664,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
     if (inputStream != null) {
       long startOffset = offset;
       String file  = currentFile; //current file is reset to null when closed.
+      boolean fileClosed = false;
 
       try {
         int counterForTuple = 0;
@@ -662,6 +673,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
           if (line == null) {
             LOG.info("done reading file ({} entries).", offset);
             closeFile(inputStream);
+            fileClosed = true;
             break;
           }
 
@@ -679,9 +691,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
       } catch (IOException e) {
         failureHandling(e);
       }
-      //Only when something was emitted from the file then we record it for entry.
-      if (offset > startOffset) {
-        currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset));
+      //Only when something was emitted from the file, or we have a closeFile(), then we record it for entry.
+      if (offset >= startOffset) {
+        currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset, fileClosed));
       }
     }
   }
@@ -1138,6 +1150,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
     final String file;
     final long startOffset;
     final long endOffset;
+    final boolean fileClosed;
 
     @SuppressWarnings("unused")
     private RecoveryEntry()
@@ -1145,13 +1158,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
       file = null;
       startOffset = -1;
       endOffset = -1;
+      fileClosed = false;
     }
 
-    RecoveryEntry(String file, long startOffset, long endOffset)
+    RecoveryEntry(String file, long startOffset, long endOffset, boolean fileClosed)
     {
       this.file = Preconditions.checkNotNull(file, "file");
       this.startOffset = startOffset;
       this.endOffset = endOffset;
+      this.fileClosed = fileClosed;
     }
 
     @Override
@@ -1172,6 +1187,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
       if (startOffset != that.startOffset) {
         return false;
       }
+      if (fileClosed != that.fileClosed) {
+        return false;
+      }
       return file.equals(that.file);
 
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0abb698e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 2f926d3..e9346ec 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -18,15 +18,19 @@
  */
 package com.datatorrent.lib.io.fs;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -44,6 +48,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -816,6 +822,181 @@ public class AbstractFileInputOperatorTest
   }
 
   @Test
+  public void testIdempotencyWithCheckPoint() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+
+    List<String> lines = Lists.newArrayList();
+    int file = 0;
+    for (int line = 0; line < 5; line++) {
+      lines.add("f" + file + "l" + line);
+    }
+    FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+
+    file = 1;
+    lines = Lists.newArrayList();
+    for (int line = 0; line < 6; line++) {
+      lines.add("f" + file + "l" + line);
+    }
+    FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+
+    // empty file
+    file = 2;
+    lines = Lists.newArrayList();
+    FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+
+
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
+    FSWindowDataManager manager = new FSWindowDataManager();
+    manager.setStatePath(testMeta.dir + "/recovery");
+
+    oper.setWindowDataManager(manager);
+
+    oper.setDirectory(testMeta.dir);
+    oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+
+    oper.setup(testMeta.context);
+
+    oper.setEmitBatchSize(3);
+
+    // sort the pendingFiles and ensure the ordering of the files scanned
+    DirectoryScannerNew newScanner = new DirectoryScannerNew();
+    oper.setScanner(newScanner);
+
+    // scan directory
+    oper.beginWindow(0);
+    oper.emitTuples();
+    oper.endWindow();
+
+    // emit f0l0, f0l1, f0l2
+    oper.beginWindow(1);
+    oper.emitTuples();
+    oper.endWindow();
+
+    //checkpoint the operator
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    LineByLineFileInputOperator checkPointOper = checkpoint(oper, bos);
+
+    // start saving output
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    TestUtils.setSink(oper.output, queryResults);
+
+    // emit f0l3, f0l4, and closeFile(f0) in the same window
+    oper.beginWindow(2);
+    oper.emitTuples();
+    oper.endWindow();
+    List<String> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples);
+
+    // emit f1l0, f1l1, f1l2
+    oper.beginWindow(3);
+    oper.emitTuples();
+    oper.endWindow();
+    List<String> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples);
+
+    // emit f1l3, f1l4, f1l5
+    oper.beginWindow(4);
+    oper.emitTuples();
+    oper.endWindow();
+    List<String> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples);
+
+    // closeFile(f1) in a new window
+    oper.beginWindow(5);
+    oper.emitTuples();
+    oper.endWindow();
+    List<String> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples);
+
+    // empty file ops, closeFile(f2) in emitTuples() only
+    oper.beginWindow(6);
+    oper.emitTuples();
+    oper.endWindow();
+    List<String> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples);
+
+    oper.teardown();
+
+    queryResults.clear();
+
+    //idempotency  part
+
+    oper = restoreCheckPoint(checkPointOper, bos);
+    testMeta.context.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+    oper.setup(testMeta.context);
+    TestUtils.setSink(oper.output, queryResults);
+
+    long startwid = testMeta.context.getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1;
+
+    oper.beginWindow(startwid);
+    Assert.assertTrue(oper.currentFile == null);
+    oper.emitTuples();
+    oper.endWindow();
+    Assert.assertEquals("lines", beforeRecovery2, queryResults.collectedTuples);
+
+    oper.beginWindow(++startwid);
+    oper.emitTuples();
+    oper.endWindow();
+    Assert.assertEquals("lines", beforeRecovery3, queryResults.collectedTuples);
+
+    oper.beginWindow(++startwid);
+    oper.emitTuples();
+    oper.endWindow();
+    Assert.assertEquals("lines", beforeRecovery4, queryResults.collectedTuples);
+
+    oper.beginWindow(++startwid);
+    Assert.assertTrue(oper.currentFile == null);
+    oper.emitTuples();
+    oper.endWindow();
+    Assert.assertEquals("lines", beforeRecovery5, queryResults.collectedTuples);
+
+    oper.beginWindow(++startwid);
+    Assert.assertTrue(oper.currentFile == null);
+    oper.emitTuples();
+    oper.endWindow();
+    Assert.assertEquals("lines", beforeRecovery6, queryResults.collectedTuples);
+
+    Assert.assertEquals("number tuples", 8, queryResults.collectedTuples.size());
+
+    oper.teardown();
+  }
+
+  /**
+   * This method checkpoints the given operator.
+   * @param oper The operator to checkpoint.
+   * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
+   * @return new operator.
+   */
+  public static LineByLineFileInputOperator checkpoint(LineByLineFileInputOperator oper, ByteArrayOutputStream bos) throws Exception
+  {
+    Kryo kryo = new Kryo();
+
+    Output loutput = new Output(bos);
+    kryo.writeObject(loutput, oper);
+    loutput.close();
+
+    Input lInput = new Input(bos.toByteArray());
+    @SuppressWarnings("unchecked")
+    LineByLineFileInputOperator checkPointedOper = kryo.readObject(lInput, oper.getClass());
+    lInput.close();
+
+    return checkPointedOper;
+  }
+
+  /**
+   * Restores the checkpointed operator.
+   * @param checkPointOper The checkpointed operator.
+   * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static LineByLineFileInputOperator restoreCheckPoint(LineByLineFileInputOperator checkPointOper, ByteArrayOutputStream bos) throws Exception
+  {
+    Kryo kryo = new Kryo();
+
+    Input lInput = new Input(bos.toByteArray());
+    LineByLineFileInputOperator oper = kryo.readObject(lInput, checkPointOper.getClass());
+    lInput.close();
+
+    return oper;
+  }
+
+  @Test
   public void testWindowDataManagerPartitioning() throws Exception
   {
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
@@ -926,4 +1107,24 @@ public class AbstractFileInputOperatorTest
     accepted = scanner.acceptFile("1_file");
     Assert.assertFalse("File should not be accepted by this partition ", accepted);
   }
+
+  private static class DirectoryScannerNew extends DirectoryScanner
+  {
+    public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles)
+    {
+      LinkedHashSet<Path> pathSet;
+      pathSet = super.scan(fs, filePath, consumedFiles);
+
+      TreeSet<Path> orderFiles = new TreeSet<>();
+      orderFiles.addAll(pathSet);
+      pathSet.clear();
+      Iterator<Path> fileIterator = orderFiles.iterator();
+      while (fileIterator.hasNext()) {
+        pathSet.add(fileIterator.next());
+      }
+
+      return pathSet;
+    }
+  }
 }
+


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2254-2269-2270' of github.com:mattqzhang/apex-malhar

Posted by pr...@apache.org.
Merge branch 'APEXMALHAR-2254-2269-2270' of github.com:mattqzhang/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/999efccb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/999efccb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/999efccb

Branch: refs/heads/master
Commit: 999efccba519f50c9af9485d2b06794bf4756b34
Parents: c89e636 0abb698
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Oct 25 08:34:50 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Oct 25 08:34:50 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    |  26 ++-
 .../io/fs/AbstractFileInputOperatorTest.java    | 201 +++++++++++++++++++
 2 files changed, 223 insertions(+), 4 deletions(-)
----------------------------------------------------------------------