You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/09 09:55:01 UTC

[1/3] incubator-apex-core git commit: APEX-97, APEX-96 #resolve

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 e512610ed -> 09f716e00


APEX-97, APEX-96 #resolve


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9b2abe09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9b2abe09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9b2abe09

Branch: refs/heads/devel-3
Commit: 9b2abe09228714bdffdde1991942c70d9a94b50a
Parents: 711fd07
Author: Gaurav <ga...@datatorrent.com>
Authored: Fri Sep 4 15:19:23 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Fri Sep 4 17:59:18 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        |  8 ++++-
 .../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++----------
 .../stram/plan/physical/PhysicalPlan.java       |  5 +++-
 3 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index b89ae59..cc8da25 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -66,6 +66,10 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   @Override
   public void save(final Object object, final int operatorId, final long windowId) throws IOException
   {
+    if(syncCheckpoint){
+      super.save(object, operatorId, windowId);
+      return;
+    }
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     if (!directory.exists()) {
@@ -120,7 +124,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   @Override
   public Object readResolve() throws ObjectStreamException
   {
-    return new AsyncFSStorageAgent(this.path, null);
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null);
+    asyncFSStorageAgent.setSyncCheckpoint(syncCheckpoint);
+    return asyncFSStorageAgent;
   }
 
   public boolean isSyncCheckpoint()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index ea33970..7b1e762 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -466,21 +466,22 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
           ba.save(operator, id, windowId);
           if (ba instanceof AsyncFSStorageAgent) {
             AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba;
-            if (!asyncFSStorageAgent.isSyncCheckpoint() && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
-              CheckpointHandler checkpointHandler = new CheckpointHandler();
-              checkpointHandler.agent = asyncFSStorageAgent;
-              checkpointHandler.operatorId = id;
-              checkpointHandler.windowId = windowId;
-              checkpointHandler.stats = checkpointStats;
-              FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
-              taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
-              executorService.submit(futureTask);
-              checkpoint = null;
-              checkpointStats = null;
-              return;
-            }
-            else {
-              asyncFSStorageAgent.copyToHDFS(id, windowId);
+            if (!asyncFSStorageAgent.isSyncCheckpoint()) {
+              if(PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+                CheckpointHandler checkpointHandler = new CheckpointHandler();
+                checkpointHandler.agent = asyncFSStorageAgent;
+                checkpointHandler.operatorId = id;
+                checkpointHandler.windowId = windowId;
+                checkpointHandler.stats = checkpointStats;
+                FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
+                taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
+                executorService.submit(futureTask);
+                checkpoint = null;
+                checkpointStats = null;
+                return;
+              }else{
+                asyncFSStorageAgent.copyToHDFS(id, windowId);
+              }
             }
           }
           checkpointStats.checkpointTime = System.currentTimeMillis() - checkpointStats.checkpointStartTime;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 2176035..de2e8d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1053,7 +1053,10 @@ public class PhysicalPlan implements Serializable
       StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
       agent.save(oo, oper.id, windowId);
       if (agent instanceof AsyncFSStorageAgent) {
-        ((AsyncFSStorageAgent) agent).copyToHDFS(oper.id, windowId);
+        AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent;
+        if(!asyncFSStorageAgent.isSyncCheckpoint()) {
+          asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
+        }
       }
     } catch (IOException e) {
       // inconsistent state, no recovery option, requires shutdown


[3/3] incubator-apex-core git commit: Merge branch 'gaurav-async-checkpoint' into devel-3

Posted by ch...@apache.org.
Merge branch 'gaurav-async-checkpoint' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/09f716e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/09f716e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/09f716e0

Branch: refs/heads/devel-3
Commit: 09f716e004ef5b8cb990d94f8a8a10f65c6f4f0b
Parents: e512610 243d5af
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Wed Sep 9 00:54:32 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Wed Sep 9 00:54:32 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        |  8 ++++-
 .../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++----------
 .../stram/plan/physical/PhysicalPlan.java       |  5 +++-
 3 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-apex-core git commit: Merge branch 'APEX-97' of git://github.com/gauravgopi123/incubator-apex-core into gaurav-async-checkpoint

Posted by ch...@apache.org.
Merge branch 'APEX-97' of git://github.com/gauravgopi123/incubator-apex-core into gaurav-async-checkpoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/243d5afd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/243d5afd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/243d5afd

Branch: refs/heads/devel-3
Commit: 243d5afd2d4396118f7099acc3dcf7e427e3287e
Parents: e512610 9b2abe0
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Wed Sep 9 00:46:35 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Wed Sep 9 00:46:35 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        |  8 ++++-
 .../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++----------
 .../stram/plan/physical/PhysicalPlan.java       |  5 +++-
 3 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------