You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:05 UTC

[34/50] incubator-apex-core git commit: APEX-37 Added APPLICATION_ATTEMPT_ID attribute and change operator and container history file so that different application attempts write to separate HDFS files

APEX-37 Added APPLICATION_ATTEMPT_ID attribute and change operator and container history file so that different application attempts write to separate HDFS files


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b8c0b4cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b8c0b4cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b8c0b4cc

Branch: refs/heads/master
Commit: b8c0b4cc06b1d3b4a94c9027da0cb44da83f4c28
Parents: 9d08532
Author: David Yan <da...@datatorrent.com>
Authored: Thu Aug 6 18:35:32 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Aug 18 13:45:14 2015 -0700

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java        |  2 +-
 .../stram/StreamingContainerManager.java        | 66 +++++++++++---------
 .../stram/plan/logical/LogicalPlan.java         |  5 ++
 .../datatorrent/stram/util/FSJsonLineFile.java  | 24 ++-----
 .../datatorrent/stram/StramRecoveryTest.java    |  2 +-
 5 files changed, 46 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 5246c9e..98c78de 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -511,7 +511,7 @@ public class StreamingAppMasterService extends CompositeService
     if (dag.isDebug()) {
       dumpOutDebugInfo();
     }
-
+    dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
     FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), conf);
     this.dnmgr = StreamingContainerManager.getInstance(recoveryHandler, dag, true);
     dag = this.dnmgr.getLogicalPlan();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 64850f5..6e0f3f5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -127,6 +127,8 @@ public class StreamingContainerManager implements PlanContext
   private final static Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
   public final static String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
   public final static String BUILTIN_APPDATA_URL = "builtin";
+  public final static String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
+  public final static String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
   public final static String APP_META_FILENAME = "meta.json";
   public final static String APP_META_KEY_ATTRIBUTES = "attributes";
   public final static String APP_META_KEY_METRICS = "metrics";
@@ -205,7 +207,7 @@ public class StreamingContainerManager implements PlanContext
   };
 
   private FSJsonLineFile containerFile;
-  private final ConcurrentMap<Integer, FSJsonLineFile> operatorFiles = Maps.newConcurrentMap();
+  private FSJsonLineFile operatorFile;
 
   private final long startTime = System.currentTimeMillis();
 
@@ -359,8 +361,11 @@ public class StreamingContainerManager implements PlanContext
       Configuration config = new YarnConfiguration();
       fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
       saveMetaInfo();
-      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), FsPermission.getDefault());
+      String fileName = String.format(CONTAINERS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
+      this.containerFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
       this.containerFile.append(getAppMasterContainerInfo());
+      fileName = String.format(OPERATORS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
+      this.operatorFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
     } catch (IOException ex) {
       throw DTThrowable.wrapIfChecked(ex);
     }
@@ -490,9 +495,7 @@ public class StreamingContainerManager implements PlanContext
     }
 
     IOUtils.closeQuietly(containerFile);
-    for (FSJsonLineFile operatorFile : operatorFiles.values()) {
-      IOUtils.closeQuietly(operatorFile);
-    }
+    IOUtils.closeQuietly(operatorFile);
     if(poolExecutor != null) {
       poolExecutor.shutdown();
     }
@@ -854,8 +857,7 @@ public class StreamingContainerManager implements PlanContext
   private void saveMetaInfo() throws IOException
   {
     Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
-    try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {
-      JSONObject top = new JSONObject();
+    try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {      JSONObject top = new JSONObject();
       JSONObject attributes = new JSONObject();
       for (Map.Entry<Attribute<?>, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
         attributes.put(entry.getKey().getSimpleName(), entry.getValue());
@@ -1379,7 +1381,7 @@ public class StreamingContainerManager implements PlanContext
   {
     long currentTimeMillis = clock.getTime();
 
-    StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId());
+    final StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId());
     if (sca == null || sca.container.getState() == PTContainer.State.KILLED) {
       // could be orphaned container that was replaced and needs to terminate
       LOG.error("Unknown container {}", heartbeat.getContainerId());
@@ -1395,34 +1397,35 @@ public class StreamingContainerManager implements PlanContext
         sca.container.bufferServerAddress = InetSocketAddress.createUnresolved(heartbeat.bufferServerHost, heartbeat.bufferServerPort);
         LOG.info("Container {} buffer server: {}", sca.container.getExternalId(), sca.container.bufferServerAddress);
       }
-      long containerStartTime = System.currentTimeMillis();
+      final long containerStartTime = System.currentTimeMillis();
       sca.container.setState(PTContainer.State.ACTIVE);
       sca.container.setStartedTime(containerStartTime);
       sca.container.setFinishedTime(-1);
       sca.jvmName = heartbeat.jvmName;
-      try {
-        containerFile.append(sca.getContainerInfo());
-      }
-      catch (IOException ex) {
-        LOG.warn("Cannot write to container file");
-      }
-      for (PTOperator ptOp : sca.container.getOperators()) {
-        try {
-          FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId());
-          if (operatorFile == null) {
-            operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), FsPermission.getDefault()));
-            operatorFile = operatorFiles.get(ptOp.getId());
+      poolExecutor.submit(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            containerFile.append(sca.getContainerInfo());
+          } catch (IOException ex) {
+            LOG.warn("Cannot write to container file");
+          }
+          for (PTOperator ptOp : sca.container.getOperators()) {
+            try {
+              JSONObject operatorInfo = new JSONObject();
+              operatorInfo.put("name", ptOp.getName());
+              operatorInfo.put("id", ptOp.getId());
+              operatorInfo.put("container", sca.container.getExternalId());
+              operatorInfo.put("startTime", containerStartTime);
+              operatorFile.append(operatorInfo);
+            } catch (IOException | JSONException ex) {
+              LOG.warn("Cannot write to operator file: ", ex);
+            }
           }
-          JSONObject operatorInfo = new JSONObject();
-          operatorInfo.put("name", ptOp.getName());
-          operatorInfo.put("container", sca.container.getExternalId());
-          operatorInfo.put("startTime", containerStartTime);
-          operatorFile.append(operatorInfo);
-        }
-        catch (Exception ex) {
-          LOG.warn("Cannot write to operator file: ", ex);
         }
-      }
+      });
     }
 
     if (heartbeat.restartRequested) {
@@ -2823,9 +2826,10 @@ public class StreamingContainerManager implements PlanContext
         scm = new StreamingContainerManager(dag, enableEventRecording, new SystemClock());
       }
       else {
-        scm = new StreamingContainerManager(checkpointedState, enableEventRecording);
         // find better way to support final transient members
         PhysicalPlan plan = checkpointedState.physicalPlan;
+        plan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, dag.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID));
+        scm = new StreamingContainerManager(checkpointedState, enableEventRecording);
         for (Field f : plan.getClass().getDeclaredFields()) {
           if (f.getType() == PlanContext.class) {
             f.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index d140d17..2d088b8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -130,6 +130,11 @@ public class LogicalPlan implements Serializable, DAG
    */
   public static Attribute<Integer> CONTAINERS_MAX_COUNT = new Attribute<Integer>(Integer.MAX_VALUE);
 
+  /**
+   * The application attempt ID from YARN
+   */
+  public static Attribute<Integer> APPLICATION_ATTEMPT_ID = new Attribute<>(1);
+
   static {
     Attribute.AttributeMap.AttributeInitializer.initialize(LogicalPlan.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java b/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
index 7935ce4..3b5a31e 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
@@ -17,7 +17,8 @@ package com.datatorrent.stram.util;
 
 import java.io.Closeable;
 import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
+import java.util.EnumSet;
+
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -32,29 +33,13 @@ import org.slf4j.LoggerFactory;
  */
 public class FSJsonLineFile implements Closeable
 {
-  private final FileSystem fs;
   private final ObjectMapper objectMapper;
   private final FSDataOutputStream os;
   private static final Logger LOG = LoggerFactory.getLogger(FSJsonLineFile.class);
 
-  public FSJsonLineFile(Path path, FsPermission permission) throws IOException
+  public FSJsonLineFile(FileContext fileContext, Path path, FsPermission permission) throws IOException
   {
-    fs = FileSystem.newInstance(path.toUri(), new Configuration());
-    FSDataOutputStream myos;
-    if (fs.exists(path)) {
-      try {
-        // happens if not the first application attempt
-        myos = fs.append(path);
-      }
-      catch (IOException ex) {
-        LOG.warn("Caught exception (OK during unit test): {}", ex.getMessage());
-        myos = FileSystem.create(fs, path, permission);
-      }
-    }
-    else {
-      myos = FileSystem.create(fs, path, permission);
-    }
-    os = myos;
+    this.os = fileContext.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.perms(permission));
     this.objectMapper = (new JSONSerializationProvider()).getContext(null);
   }
 
@@ -74,7 +59,6 @@ public class FSJsonLineFile implements Closeable
   public void close() throws IOException
   {
     os.close();
-    fs.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 6172d8a..ab2092a 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -391,6 +391,7 @@ public class StramRecoveryTest
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1);
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1);
+    dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     dag.addOperator("o1", StatsListeningOperator.class);
 
@@ -408,7 +409,6 @@ public class StramRecoveryTest
     PTOperator o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0);
     long[] ids = new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId());
     Assert.assertArrayEquals(new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids);
-
     Assert.assertNull(o1p1.getContainer().getExternalId());
     // trigger journal write
     o1p1.getContainer().setExternalId("cid1");