You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/20 14:33:51 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2017 Fixed failing tests.
Repository: apex-malhar
Updated Branches:
refs/heads/master 2cf8bade8 -> 0f8442472
APEXMALHAR-2017 Fixed failing tests.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0f844247
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0f844247
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0f844247
Branch: refs/heads/master
Commit: 0f84424720268e1eebb06992bcda00d2e7d591d4
Parents: 28a7e34
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Mon Oct 3 09:01:30 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Wed Oct 19 22:28:34 2016 -0700
----------------------------------------------------------------------
.../datatorrent/contrib/hive/HiveMockTest.java | 16 ++++++++++++-
.../lib/io/fs/AbstractFileInputOperator.java | 3 +--
.../lib/io/fs/AbstractFileOutputOperator.java | 6 +++--
.../lib/io/fs/AbstractReconciler.java | 2 +-
.../io/fs/AbstractFileOutputOperatorTest.java | 25 ++++++++++----------
.../AbstractWindowFileOutputOperatorTest.java | 5 ++--
.../lib/io/fs/FSInputModuleAppTest.java | 1 +
.../lib/fs/GenericFileOutputOperatorTest.java | 6 +++++
8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
index 3c64bdf..4ec92c9 100755
--- a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
@@ -50,7 +50,6 @@ import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitio
import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
-
public class HiveMockTest extends HiveTestService
{
public static final String APP_ID = "HiveOperatorTest";
@@ -250,6 +249,11 @@ public class HiveMockTest extends HiveTestService
}
fsRolling.endWindow();
+
+ if (wid == 6) {
+ fsRolling.beforeCheckpoint(wid);
+ fsRolling.checkpointed(wid);
+ }
}
fsRolling.teardown();
@@ -353,6 +357,11 @@ public class HiveMockTest extends HiveTestService
}
fsRolling.endWindow();
+
+ if (wid == 6) {
+ fsRolling.beforeCheckpoint(wid);
+ fsRolling.checkpointed(wid);
+ }
}
fsRolling.teardown();
@@ -521,6 +530,11 @@ public class HiveMockTest extends HiveTestService
fsRolling.endWindow();
+ if ((wid == 6) || (wid == 9)) {
+ fsRolling.beforeCheckpoint(wid);
+ fsRolling.checkpointed(wid);
+ }
+
if (wid == 9) {
Kryo kryo = new Kryo();
FieldSerializer<HiveOperator> f1 = (FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 14cabfc..d4ee03b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -37,7 +37,6 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.validation.OverridesAttribute;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
@@ -99,7 +98,7 @@ import com.datatorrent.lib.util.KryoCloneUtils;
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener,
- Operator.CheckpointListener, Operator.CheckpointNotificationListener
+ Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index f703a19..27a56cd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -262,7 +262,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
private Long expireStreamAfterAccessMillis;
private final Set<String> filesWithOpenStreams;
- private boolean initializeContext;
+ private transient boolean initializeContext;
/**
* This input port receives incoming tuples.
@@ -968,6 +968,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
@Override
public void beginWindow(long windowId)
{
+ // All the filter state needs to be flushed to the disk. Not all filters allow a flush option, so the filters have
+ // to be closed and reopened. If no filter being is being used then it is a essentially a noop as the underlying
+ // FSDataOutputStream is not being closed in this operation.
if (initializeContext) {
try {
Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
@@ -1240,7 +1243,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
long start = System.currentTimeMillis();
streamContext.finalizeContext();
totalWritingTime += System.currentTimeMillis() - start;
- //streamContext.resetFilter();
// Re-initialize context when next window starts after checkpoint
initializeContext = true;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index c12becd..0d67c31 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator.IdleTimeHandler;
import com.datatorrent.api.Operator.CheckpointNotificationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 5b58121..8f0fbb0 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -66,6 +66,7 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.RandomWordGenerator;
import com.datatorrent.lib.util.TestUtils;
@@ -245,7 +246,7 @@ public class AbstractFileOutputOperatorTest
*/
public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId)
{
- if (windowId >= 0) {
+ if (windowId >= Stateless.WINDOW_ID) {
writer.beforeCheckpoint(windowId);
}
Kryo kryo = new Kryo();
@@ -421,7 +422,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(2);
@@ -677,7 +678,7 @@ public class AbstractFileOutputOperatorTest
writer.requestFinalize(EVEN_FILE);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(4);
@@ -821,7 +822,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(2);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(3);
@@ -868,8 +869,8 @@ public class AbstractFileOutputOperatorTest
writer.input.put(4);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
- AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
+ AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, Stateless.WINDOW_ID);
LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
@@ -1134,7 +1135,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(2);
@@ -1231,7 +1232,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(2);
@@ -1278,7 +1279,7 @@ public class AbstractFileOutputOperatorTest
writer.input.process(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.process(2);
@@ -1369,7 +1370,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(3);
writer.input.put(4);
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.input.put(3);
writer.input.put(4);
@@ -1766,13 +1767,11 @@ public class AbstractFileOutputOperatorTest
for (int j = 0; j < 1000; ++j) {
writer.input.put(i);
}
- //writer.endWindow();
+ writer.endWindow();
if ((i % 2) == 1) {
writer.beforeCheckpoint(i);
evenOffsets.add(evenCounterContext.getCounter());
oddOffsets.add(oddCounterContext.getCounter());
- //evenOffsets.add(evenFile.length());
- //oddOffsets.add(oddFile.length());
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
index bcb1bc3..daebecb 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
@@ -22,6 +22,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
+import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
import com.datatorrent.lib.util.TestUtils.TestInfo;
@@ -70,7 +71,7 @@ public class AbstractWindowFileOutputOperatorTest
oper.input.process("window 0");
oper.endWindow();
- AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0);
+ AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID);
oper.beginWindow(1);
oper.input.process("window 1");
@@ -110,7 +111,7 @@ public class AbstractWindowFileOutputOperatorTest
oper.beginWindow(1);
oper.input.process("1");
- AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1);
+ AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID);
oper.input.process("1");
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
index 4213a00..4042d3c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
@@ -99,6 +99,7 @@ public class FSInputModuleAppTest
conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4");
conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+ conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","10");
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(app, conf);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
index 6082f57..8b8ed01 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
@@ -62,6 +62,7 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes
}
writer.endWindow();
}
+ checkpoint(writer, 10);
writer.committed(10);
for (int i = 13; i <= 26; i++) {
@@ -71,7 +72,10 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes
}
writer.endWindow();
}
+ checkpoint(writer, 20);
writer.committed(20);
+
+ checkpoint(writer, 26);
writer.committed(26);
String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
@@ -108,8 +112,10 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes
}
writer.endWindow();
if (i % 10 == 0) {
+ checkpoint(writer, 10);
writer.committed(10);
}
+ checkpoint(writer, 24);
}
writer.committed(tuples.length);
[2/2] apex-malhar git commit: Using CheckpointNotificationListener
and beforeCheckpoint callback to do IO in a more optimized fashion
Posted by th...@apache.org.
Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/28a7e347
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/28a7e347
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/28a7e347
Branch: refs/heads/master
Commit: 28a7e347691dc5dbe364888c4520f5c31139bc39
Parents: 2cf8bad
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Mar 16 20:57:39 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Wed Oct 19 22:28:34 2016 -0700
----------------------------------------------------------------------
.../hbase/AbstractHBasePutOutputOperator.java | 57 +++++++-------------
.../kinesis/AbstractKinesisInputOperator.java | 8 ++-
.../kinesis/AbstractKinesisOutputOperator.java | 18 ++++++-
.../rabbitmq/AbstractRabbitMQInputOperator.java | 7 ++-
.../redis/AbstractRedisInputOperator.java | 9 +++-
.../contrib/splunk/SplunkTcpOutputOperator.java | 17 ++++--
.../hive/AbstractFSRollingOutputOperator.java | 5 +-
.../lib/io/fs/AbstractFileInputOperator.java | 10 +++-
.../lib/io/fs/AbstractFileOutputOperator.java | 49 ++++++++++-------
.../lib/io/fs/AbstractReconciler.java | 12 +++--
.../com/datatorrent/lib/io/fs/FileSplitter.java | 7 ++-
.../lib/io/fs/FileSplitterInput.java | 7 ++-
.../lib/io/jms/AbstractJMSInputOperator.java | 7 ++-
.../lib/join/AbstractJoinOperator.java | 7 ++-
.../io/fs/AbstractFileOutputOperatorTest.java | 47 ++++++++++------
.../AbstractWindowFileOutputOperatorTest.java | 4 +-
16 files changed, 171 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
index 3973008..7f93394 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
@@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
import javax.validation.constraints.Min;
+import com.datatorrent.api.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,27 +53,21 @@ import com.datatorrent.netlet.util.DTThrowable;
* The tuple type
* @since 1.0.2
*/
-public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> {
+public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> implements Operator.CheckpointNotificationListener {
private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class);
- public static final int DEFAULT_BATCH_SIZE = 1000;
- private int batchSize = DEFAULT_BATCH_SIZE;
- protected int unCommittedSize = 0;
- public AbstractHBasePutOutputOperator() {
+ public AbstractHBasePutOutputOperator()
+ {
store = new HBaseStore();
}
@Override
- public void processTuple(T tuple) {
+ public void processTuple(T tuple)
+ {
HTable table = store.getTable();
Put put = operationPut(tuple);
try {
table.put(put);
- if( ++unCommittedSize >= batchSize )
- {
- table.flushCommits();
- unCommittedSize = 0;
- }
} catch (RetriesExhaustedWithDetailsException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
@@ -80,46 +75,30 @@ public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOut
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
}
-
}
@Override
- public void endWindow()
+ public void beforeCheckpoint(long windowId)
{
- try
- {
- if( unCommittedSize > 0 ) {
- store.getTable().flushCommits();
- unCommittedSize = 0;
- }
- }
- catch (RetriesExhaustedWithDetailsException e) {
- logger.error("Could not output tuple", e);
- DTThrowable.rethrow(e);
+ try {
+ store.getTable().flushCommits();
} catch (InterruptedIOException e) {
- logger.error("Could not output tuple", e);
+ DTThrowable.rethrow(e);
+ } catch (RetriesExhaustedWithDetailsException e) {
DTThrowable.rethrow(e);
}
}
- public abstract Put operationPut(T t);
+ @Override
+ public void checkpointed(long l) {
- /**
- * the batch size save flush data
- */
- @Min(1)
- public int getBatchSize()
- {
- return batchSize;
}
- /**
- * the batch size save flush data
- */
- public void setBatchSize(int batchSize)
- {
- this.batchSize = batchSize;
- }
+ @Override
+ public void committed(long l) {
+ }
+ public abstract Put operationPut(T t);
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index c03df21..fc10bea 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -86,7 +86,7 @@ class KinesisPair <F, S> extends Pair<F, S>
* @since 2.0.0
*/
@SuppressWarnings("rawtypes")
-public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointListener
+public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointNotificationListener
{
private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class);
@@ -530,6 +530,12 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
public void checkpointed(long windowId)
{
}
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
/**
* Implement ActivationListener Interface.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
index 43fc62a..d6f9d36 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
@@ -49,7 +49,7 @@ import java.util.List;
* @param <T>
* @since 2.0.0
*/
-public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
+public abstract class AbstractKinesisOutputOperator<V, T> implements Operator, Operator.CheckpointNotificationListener
{
private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class );
protected String streamName;
@@ -91,6 +91,10 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
{
}
+ @Override
+ public void endWindow() {
+ }
+
/**
* Implement Component Interface.
*/
@@ -103,7 +107,7 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
* Implement Operator Interface.
*/
@Override
- public void endWindow()
+ public void beforeCheckpoint(long windowId)
{
if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) {
try {
@@ -114,6 +118,16 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
}
}
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ }
+
/**
* Implement Component Interface.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 08157bc..672122c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -77,7 +77,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
*/
public abstract class AbstractRabbitMQInputOperator<T> implements
InputOperator, Operator.ActivationListener<OperatorContext>,
- Operator.CheckpointListener
+ Operator.CheckpointNotificationListener
{
private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
@NotNull
@@ -312,6 +312,11 @@ public abstract class AbstractRabbitMQInputOperator<T> implements
}
@Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
public void checkpointed(long windowId)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 0b12574..092a3c6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -31,7 +31,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
@@ -47,7 +47,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
* The tuple type.
* @since 0.9.3
*/
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener
+public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointNotificationListener
{
protected transient List<String> keys = new ArrayList<String>();
protected transient Integer scanOffset;
@@ -225,6 +225,11 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor
abstract public void processTuples();
@Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
public void checkpointed(long windowId)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
index cb81375..f7e98b6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
@@ -19,6 +19,7 @@
package com.datatorrent.contrib.splunk;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.AbstractStoreOutputOperator;
import com.splunk.TcpInput;
@@ -34,7 +35,7 @@ import java.net.Socket;
* @tags splunk
* @since 1.0.4
*/
-public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> {
+public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> implements Operator.CheckpointNotificationListener {
private String tcpPort;
private transient Socket socket;
@@ -75,8 +76,8 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S
}
@Override
- public void endWindow() {
-
+ public void beforeCheckpoint(long windowId)
+ {
try {
stream.flush();
} catch (IOException e) {
@@ -85,6 +86,16 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S
}
@Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ }
+
+ @Override
public void teardown() {
super.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
index 3c9c4da..37b5f2e 100755
--- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
@@ -37,7 +37,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
@@ -52,8 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable;
*
* @since 2.1.0
*/
-public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T>
- implements CheckpointListener
+public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> implements CheckpointNotificationListener
{
private transient String outputFilePath;
protected MutableInt partNumber;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 9e80b4e..14cabfc 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.validation.OverridesAttribute;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
@@ -97,8 +98,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
* @since 1.0.2
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractFileInputOperator<T>
- implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener
+public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener,
+ Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);
@@ -909,6 +910,11 @@ public abstract class AbstractFileInputOperator<T>
}
@Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
public void checkpointed(long windowId)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index ab8bedc..f703a19 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -110,7 +110,7 @@ import com.datatorrent.lib.counters.BasicCounters;
* @since 2.0.0
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener
+public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class);
@@ -262,6 +262,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
private Long expireStreamAfterAccessMillis;
private final Set<String> filesWithOpenStreams;
+ private boolean initializeContext;
+
/**
* This input port receives incoming tuples.
*/
@@ -966,13 +968,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
@Override
public void beginWindow(long windowId)
{
- try {
- Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
- for (FSFilterStreamContext streamContext : openStreams.values()) {
- streamContext.initializeContext();
+ if (initializeContext) {
+ try {
+ Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+ for (FSFilterStreamContext streamContext : openStreams.values()) {
+ streamContext.initializeContext();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
+ initializeContext = false;
}
currentWindow = windowId;
}
@@ -980,18 +985,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
@Override
public void endWindow()
{
- try {
- Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
- for (FSFilterStreamContext streamContext: openStreams.values()) {
- long start = System.currentTimeMillis();
- streamContext.finalizeContext();
- totalWritingTime += System.currentTimeMillis() - start;
- //streamContext.resetFilter();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
if (rotationWindows > 0) {
if (++rotationCount == rotationWindows) {
rotationCount = 0;
@@ -1239,6 +1232,24 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
@Override
+ public void beforeCheckpoint(long l)
+ {
+ try {
+ Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+ for (FSFilterStreamContext streamContext: openStreams.values()) {
+ long start = System.currentTimeMillis();
+ streamContext.finalizeContext();
+ totalWritingTime += System.currentTimeMillis() - start;
+ //streamContext.resetFilter();
+ // Re-initialize context when next window starts after checkpoint
+ initializeContext = true;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public void checkpointed(long l)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 945c000..c12becd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -25,17 +25,14 @@ import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
-
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
@@ -55,7 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable;
* @since 2.0.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointListener, IdleTimeHandler
+public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class);
public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
@@ -125,6 +122,11 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
}
@Override
+ public void beforeCheckpoint(long l)
+ {
+ }
+
+ @Override
public void checkpointed(long l)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 4bb53e5..b9594b3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -79,7 +79,7 @@ import com.datatorrent.netlet.util.DTThrowable;
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
@Deprecated
-public class FileSplitter implements InputOperator, Operator.CheckpointListener
+public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
protected Long blockSize;
private int sequenceNo;
@@ -380,6 +380,11 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
}
@Override
+ public void beforeCheckpoint(long l)
+ {
+ }
+
+ @Override
public void checkpointed(long l)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 2d290cd..745f953 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -76,7 +76,7 @@ import com.datatorrent.api.annotation.Stateless;
* @since 2.0.0
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener
+public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
@NotNull
private WindowDataManager windowDataManager;
@@ -219,6 +219,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
@Override
+ public void beforeCheckpoint(long l)
+ {
+ }
+
+ @Override
public void checkpointed(long l)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index 2b8b58d..72bf63c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -80,7 +80,7 @@ import com.datatorrent.lib.counters.BasicCounters;
@OperatorAnnotation(checkpointableWithinAppWindow = false)
public abstract class AbstractJMSInputOperator<T> extends JMSBase
implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener,
- Operator.IdleTimeHandler, Operator.CheckpointListener
+ Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k
@@ -395,6 +395,11 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase
}
@Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
public void checkpointed(long windowId)
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
index 89df25e..d0f722d 100644
--- a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
@@ -64,7 +64,7 @@ import com.datatorrent.common.util.BaseOperator;
* @since 3.4.0
*/
@InterfaceStability.Unstable
-public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
+public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointNotificationListener
{
@AutoMetric
private long tuplesJoinedPerSec;
@@ -226,6 +226,11 @@ public abstract class AbstractJoinOperator<T> extends BaseOperator implements Op
}
@Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
public void checkpointed(long windowId)
{
leftStore.getStore().checkpointed(windowId);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 03f3bf6..5b58121 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -243,8 +243,11 @@ public class AbstractFileOutputOperatorTest
* @param writer The writer to checkpoint.
* @return new writer.
*/
- public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer)
+ public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId)
{
+ if (windowId >= 0) {
+ writer.beforeCheckpoint(windowId);
+ }
Kryo kryo = new Kryo();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Output loutput = new Output(bos);
@@ -418,7 +421,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
writer.beginWindow(1);
writer.input.put(2);
@@ -577,6 +580,7 @@ public class AbstractFileOutputOperatorTest
writer.requestFinalize(EVEN_FILE);
writer.requestFinalize(ODD_FILE);
+ writer.beforeCheckpoint(1);
writer.committed(1);
}
@@ -603,6 +607,7 @@ public class AbstractFileOutputOperatorTest
writer.requestFinalize(ODD_FILE);
writer.requestFinalize(EVEN_FILE);
+ writer.beforeCheckpoint(1);
writer.committed(1);
}
@@ -672,7 +677,7 @@ public class AbstractFileOutputOperatorTest
writer.requestFinalize(EVEN_FILE);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
writer.beginWindow(1);
writer.input.put(4);
@@ -690,6 +695,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(8);
writer.input.put(9);
writer.endWindow();
+ writer.beforeCheckpoint(2);
writer.committed(2);
}
@@ -815,7 +821,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(2);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
writer.beginWindow(1);
writer.input.put(3);
@@ -862,8 +868,8 @@ public class AbstractFileOutputOperatorTest
writer.input.put(4);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
- AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+ AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1);
LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
@@ -1128,7 +1134,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
writer.beginWindow(1);
writer.input.put(2);
@@ -1225,7 +1231,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
writer.beginWindow(1);
writer.input.put(2);
@@ -1272,7 +1278,7 @@ public class AbstractFileOutputOperatorTest
writer.input.process(1);
writer.endWindow();
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
writer.beginWindow(1);
writer.input.process(2);
@@ -1363,7 +1369,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(3);
writer.input.put(4);
- AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+ AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1);
writer.input.put(3);
writer.input.put(4);
@@ -1554,8 +1560,11 @@ public class AbstractFileOutputOperatorTest
writer.input.put(i);
}
writer.endWindow();
- evenOffsets.add(evenFile.length());
- oddOffsets.add(oddFile.length());
+ if ((i % 2) == 1) {
+ writer.beforeCheckpoint(i);
+ evenOffsets.add(evenFile.length());
+ oddOffsets.add(oddFile.length());
+ }
}
writer.teardown();
@@ -1580,6 +1589,7 @@ public class AbstractFileOutputOperatorTest
writer.input.put(2);
writer.input.put(3);
writer.endWindow();
+ writer.beforeCheckpoint(0);
//failure and restored
writer.setup(testMeta.testOperatorContext);
@@ -1756,11 +1766,14 @@ public class AbstractFileOutputOperatorTest
for (int j = 0; j < 1000; ++j) {
writer.input.put(i);
}
- writer.endWindow();
- evenOffsets.add(evenCounterContext.getCounter());
- oddOffsets.add(oddCounterContext.getCounter());
- //evenOffsets.add(evenFile.length());
- //oddOffsets.add(oddFile.length());
+ //writer.endWindow();
+ if ((i % 2) == 1) {
+ writer.beforeCheckpoint(i);
+ evenOffsets.add(evenCounterContext.getCounter());
+ oddOffsets.add(oddCounterContext.getCounter());
+ //evenOffsets.add(evenFile.length());
+ //oddOffsets.add(oddFile.length());
+ }
}
writer.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
index 32c32f7..bcb1bc3 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
@@ -70,7 +70,7 @@ public class AbstractWindowFileOutputOperatorTest
oper.input.process("window 0");
oper.endWindow();
- AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper);
+ AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0);
oper.beginWindow(1);
oper.input.process("window 1");
@@ -110,7 +110,7 @@ public class AbstractWindowFileOutputOperatorTest
oper.beginWindow(1);
oper.input.process("1");
- AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper);
+ AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1);
oper.input.process("1");
oper.teardown();