You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/25 16:03:37 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2254, APEXMALHAR-2269,
APEXMALHAR-2270 Fix bugs on replay tuple skipping and idempotency
issues.
Repository: apex-malhar
Updated Branches:
refs/heads/master c89e63621 -> 999efccba
APEXMALHAR-2254, APEXMALHAR-2269, APEXMALHAR-2270 Fix bugs on replay tuple skipping and idempotency issues.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0abb698e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0abb698e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0abb698e
Branch: refs/heads/master
Commit: 0abb698e5c784c8813943065acccda1993f8cb5d
Parents: c92ca15
Author: Matt Zhang <ma...@datatorrent.com>
Authored: Mon Oct 10 09:49:47 2016 -0700
Committer: Matt Zhang <ma...@gmail.com>
Committed: Sun Oct 23 19:25:51 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileInputOperator.java | 26 ++-
.../io/fs/AbstractFileInputOperatorTest.java | 201 +++++++++++++++++++
2 files changed, 223 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0abb698e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index d4ee03b..0f3cc48 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -592,17 +592,27 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
pendingFiles.remove(recoveryEntry.file);
}
inputStream = retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.startOffset));
+
+ while (--skipCount >= 0) {
+ readEntity();
+ }
while (offset < recoveryEntry.endOffset) {
T line = readEntity();
offset++;
emit(line);
}
+ if (recoveryEntry.fileClosed) {
+ closeFile(inputStream);
+ }
} else {
while (offset < recoveryEntry.endOffset) {
T line = readEntity();
offset++;
emit(line);
}
+ if (recoveryEntry.fileClosed) {
+ closeFile(inputStream);
+ }
}
}
}
@@ -654,6 +664,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
if (inputStream != null) {
long startOffset = offset;
String file = currentFile; //current file is reset to null when closed.
+ boolean fileClosed = false;
try {
int counterForTuple = 0;
@@ -662,6 +673,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
if (line == null) {
LOG.info("done reading file ({} entries).", offset);
closeFile(inputStream);
+ fileClosed = true;
break;
}
@@ -679,9 +691,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
} catch (IOException e) {
failureHandling(e);
}
- //Only when something was emitted from the file then we record it for entry.
- if (offset > startOffset) {
- currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset));
+ //Only when something was emitted from the file, or we have a closeFile(), then we record it for entry.
+ if (offset >= startOffset) {
+ currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset, fileClosed));
}
}
}
@@ -1138,6 +1150,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
final String file;
final long startOffset;
final long endOffset;
+ final boolean fileClosed;
@SuppressWarnings("unused")
private RecoveryEntry()
@@ -1145,13 +1158,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
file = null;
startOffset = -1;
endOffset = -1;
+ fileClosed = false;
}
- RecoveryEntry(String file, long startOffset, long endOffset)
+ RecoveryEntry(String file, long startOffset, long endOffset, boolean fileClosed)
{
this.file = Preconditions.checkNotNull(file, "file");
this.startOffset = startOffset;
this.endOffset = endOffset;
+ this.fileClosed = fileClosed;
}
@Override
@@ -1172,6 +1187,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
if (startOffset != that.startOffset) {
return false;
}
+ if (fileClosed != that.fileClosed) {
+ return false;
+ }
return file.equals(that.file);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0abb698e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 2f926d3..e9346ec 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -18,15 +18,19 @@
*/
package com.datatorrent.lib.io.fs;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeSet;
import org.junit.Assert;
import org.junit.Rule;
@@ -44,6 +48,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -816,6 +822,181 @@ public class AbstractFileInputOperatorTest
}
@Test
+ public void testIdempotencyWithCheckPoint() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+
+ List<String> lines = Lists.newArrayList();
+ int file = 0;
+ for (int line = 0; line < 5; line++) {
+ lines.add("f" + file + "l" + line);
+ }
+ FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+
+ file = 1;
+ lines = Lists.newArrayList();
+ for (int line = 0; line < 6; line++) {
+ lines.add("f" + file + "l" + line);
+ }
+ FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+
+ // empty file
+ file = 2;
+ lines = Lists.newArrayList();
+ FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+
+
+ LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
+ FSWindowDataManager manager = new FSWindowDataManager();
+ manager.setStatePath(testMeta.dir + "/recovery");
+
+ oper.setWindowDataManager(manager);
+
+ oper.setDirectory(testMeta.dir);
+ oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+
+ oper.setup(testMeta.context);
+
+ oper.setEmitBatchSize(3);
+
+ // sort the pendingFiles and ensure the ordering of the files scanned
+ DirectoryScannerNew newScanner = new DirectoryScannerNew();
+ oper.setScanner(newScanner);
+
+ // scan directory
+ oper.beginWindow(0);
+ oper.emitTuples();
+ oper.endWindow();
+
+ // emit f0l0, f0l1, f0l2
+ oper.beginWindow(1);
+ oper.emitTuples();
+ oper.endWindow();
+
+ //checkpoint the operator
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ LineByLineFileInputOperator checkPointOper = checkpoint(oper, bos);
+
+ // start saving output
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ TestUtils.setSink(oper.output, queryResults);
+
+ // emit f0l3, f0l4, and closeFile(f0) in the same window
+ oper.beginWindow(2);
+ oper.emitTuples();
+ oper.endWindow();
+ List<String> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples);
+
+ // emit f1l0, f1l1, f1l2
+ oper.beginWindow(3);
+ oper.emitTuples();
+ oper.endWindow();
+ List<String> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples);
+
+ // emit f1l3, f1l4, f1l5
+ oper.beginWindow(4);
+ oper.emitTuples();
+ oper.endWindow();
+ List<String> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples);
+
+ // closeFile(f1) in a new window
+ oper.beginWindow(5);
+ oper.emitTuples();
+ oper.endWindow();
+ List<String> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples);
+
+ // empty file ops, closeFile(f2) in emitTuples() only
+ oper.beginWindow(6);
+ oper.emitTuples();
+ oper.endWindow();
+ List<String> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples);
+
+ oper.teardown();
+
+ queryResults.clear();
+
+ //idempotency part
+
+ oper = restoreCheckPoint(checkPointOper, bos);
+ testMeta.context.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+ oper.setup(testMeta.context);
+ TestUtils.setSink(oper.output, queryResults);
+
+ long startwid = testMeta.context.getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1;
+
+ oper.beginWindow(startwid);
+ Assert.assertTrue(oper.currentFile == null);
+ oper.emitTuples();
+ oper.endWindow();
+ Assert.assertEquals("lines", beforeRecovery2, queryResults.collectedTuples);
+
+ oper.beginWindow(++startwid);
+ oper.emitTuples();
+ oper.endWindow();
+ Assert.assertEquals("lines", beforeRecovery3, queryResults.collectedTuples);
+
+ oper.beginWindow(++startwid);
+ oper.emitTuples();
+ oper.endWindow();
+ Assert.assertEquals("lines", beforeRecovery4, queryResults.collectedTuples);
+
+ oper.beginWindow(++startwid);
+ Assert.assertTrue(oper.currentFile == null);
+ oper.emitTuples();
+ oper.endWindow();
+ Assert.assertEquals("lines", beforeRecovery5, queryResults.collectedTuples);
+
+ oper.beginWindow(++startwid);
+ Assert.assertTrue(oper.currentFile == null);
+ oper.emitTuples();
+ oper.endWindow();
+ Assert.assertEquals("lines", beforeRecovery6, queryResults.collectedTuples);
+
+ Assert.assertEquals("number tuples", 8, queryResults.collectedTuples.size());
+
+ oper.teardown();
+ }
+
+ /**
+ * This method checkpoints the given operator.
+ * @param oper The operator to checkpoint.
+ * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
+ * @return new operator.
+ */
+ public static LineByLineFileInputOperator checkpoint(LineByLineFileInputOperator oper, ByteArrayOutputStream bos) throws Exception
+ {
+ Kryo kryo = new Kryo();
+
+ Output loutput = new Output(bos);
+ kryo.writeObject(loutput, oper);
+ loutput.close();
+
+ Input lInput = new Input(bos.toByteArray());
+ @SuppressWarnings("unchecked")
+ LineByLineFileInputOperator checkPointedOper = kryo.readObject(lInput, oper.getClass());
+ lInput.close();
+
+ return checkPointedOper;
+ }
+
+ /**
+ * Restores the checkpointed operator.
+ * @param checkPointOper The checkpointed operator.
+ * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static LineByLineFileInputOperator restoreCheckPoint(LineByLineFileInputOperator checkPointOper, ByteArrayOutputStream bos) throws Exception
+ {
+ Kryo kryo = new Kryo();
+
+ Input lInput = new Input(bos.toByteArray());
+ LineByLineFileInputOperator oper = kryo.readObject(lInput, checkPointOper.getClass());
+ lInput.close();
+
+ return oper;
+ }
+
+ @Test
public void testWindowDataManagerPartitioning() throws Exception
{
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
@@ -926,4 +1107,24 @@ public class AbstractFileInputOperatorTest
accepted = scanner.acceptFile("1_file");
Assert.assertFalse("File should not be accepted by this partition ", accepted);
}
+
+ private static class DirectoryScannerNew extends DirectoryScanner
+ {
+ public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles)
+ {
+ LinkedHashSet<Path> pathSet;
+ pathSet = super.scan(fs, filePath, consumedFiles);
+
+ TreeSet<Path> orderFiles = new TreeSet<>();
+ orderFiles.addAll(pathSet);
+ pathSet.clear();
+ Iterator<Path> fileIterator = orderFiles.iterator();
+ while (fileIterator.hasNext()) {
+ pathSet.add(fileIterator.next());
+ }
+
+ return pathSet;
+ }
+ }
}
+
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2254-2269-2270' of github.com:mattqzhang/apex-malhar
Posted by pr...@apache.org.
Merge branch 'APEXMALHAR-2254-2269-2270' of github.com:mattqzhang/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/999efccb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/999efccb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/999efccb
Branch: refs/heads/master
Commit: 999efccba519f50c9af9485d2b06794bf4756b34
Parents: c89e636 0abb698
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Oct 25 08:34:50 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Oct 25 08:34:50 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileInputOperator.java | 26 ++-
.../io/fs/AbstractFileInputOperatorTest.java | 201 +++++++++++++++++++
2 files changed, 223 insertions(+), 4 deletions(-)
----------------------------------------------------------------------