You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/09 09:55:01 UTC
[1/3] incubator-apex-core git commit: APEX-97, APEX-96 #resolve
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 e512610ed -> 09f716e00
APEX-97, APEX-96 #resolve
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9b2abe09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9b2abe09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9b2abe09
Branch: refs/heads/devel-3
Commit: 9b2abe09228714bdffdde1991942c70d9a94b50a
Parents: 711fd07
Author: Gaurav <ga...@datatorrent.com>
Authored: Fri Sep 4 15:19:23 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Fri Sep 4 17:59:18 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 8 ++++-
.../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++----------
.../stram/plan/physical/PhysicalPlan.java | 5 +++-
3 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index b89ae59..cc8da25 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -66,6 +66,10 @@ public class AsyncFSStorageAgent extends FSStorageAgent
@Override
public void save(final Object object, final int operatorId, final long windowId) throws IOException
{
+ if(syncCheckpoint){
+ super.save(object, operatorId, windowId);
+ return;
+ }
String operatorIdStr = String.valueOf(operatorId);
File directory = new File(localBasePath, operatorIdStr);
if (!directory.exists()) {
@@ -120,7 +124,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent
@Override
public Object readResolve() throws ObjectStreamException
{
- return new AsyncFSStorageAgent(this.path, null);
+ AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null);
+ asyncFSStorageAgent.setSyncCheckpoint(syncCheckpoint);
+ return asyncFSStorageAgent;
}
public boolean isSyncCheckpoint()
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index ea33970..7b1e762 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -466,21 +466,22 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
ba.save(operator, id, windowId);
if (ba instanceof AsyncFSStorageAgent) {
AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba;
- if (!asyncFSStorageAgent.isSyncCheckpoint() && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
- CheckpointHandler checkpointHandler = new CheckpointHandler();
- checkpointHandler.agent = asyncFSStorageAgent;
- checkpointHandler.operatorId = id;
- checkpointHandler.windowId = windowId;
- checkpointHandler.stats = checkpointStats;
- FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
- taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
- executorService.submit(futureTask);
- checkpoint = null;
- checkpointStats = null;
- return;
- }
- else {
- asyncFSStorageAgent.copyToHDFS(id, windowId);
+ if (!asyncFSStorageAgent.isSyncCheckpoint()) {
+ if(PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+ CheckpointHandler checkpointHandler = new CheckpointHandler();
+ checkpointHandler.agent = asyncFSStorageAgent;
+ checkpointHandler.operatorId = id;
+ checkpointHandler.windowId = windowId;
+ checkpointHandler.stats = checkpointStats;
+ FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
+ taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
+ executorService.submit(futureTask);
+ checkpoint = null;
+ checkpointStats = null;
+ return;
+ }else{
+ asyncFSStorageAgent.copyToHDFS(id, windowId);
+ }
}
}
checkpointStats.checkpointTime = System.currentTimeMillis() - checkpointStats.checkpointStartTime;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 2176035..de2e8d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1053,7 +1053,10 @@ public class PhysicalPlan implements Serializable
StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
agent.save(oo, oper.id, windowId);
if (agent instanceof AsyncFSStorageAgent) {
- ((AsyncFSStorageAgent) agent).copyToHDFS(oper.id, windowId);
+ AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent;
+ if(!asyncFSStorageAgent.isSyncCheckpoint()) {
+ asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
+ }
}
} catch (IOException e) {
// inconsistent state, no recovery option, requires shutdown
[3/3] incubator-apex-core git commit: Merge branch
'gaurav-async-checkpoint' into devel-3
Posted by ch...@apache.org.
Merge branch 'gaurav-async-checkpoint' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/09f716e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/09f716e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/09f716e0
Branch: refs/heads/devel-3
Commit: 09f716e004ef5b8cb990d94f8a8a10f65c6f4f0b
Parents: e512610 243d5af
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Wed Sep 9 00:54:32 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Wed Sep 9 00:54:32 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 8 ++++-
.../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++----------
.../stram/plan/physical/PhysicalPlan.java | 5 +++-
3 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-apex-core git commit: Merge branch 'APEX-97' of
git://github.com/gauravgopi123/incubator-apex-core into
gaurav-async-checkpoint
Posted by ch...@apache.org.
Merge branch 'APEX-97' of git://github.com/gauravgopi123/incubator-apex-core into gaurav-async-checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/243d5afd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/243d5afd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/243d5afd
Branch: refs/heads/devel-3
Commit: 243d5afd2d4396118f7099acc3dcf7e427e3287e
Parents: e512610 9b2abe0
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Wed Sep 9 00:46:35 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Wed Sep 9 00:46:35 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 8 ++++-
.../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++----------
.../stram/plan/physical/PhysicalPlan.java | 5 +++-
3 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------