You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:45 UTC

[14/50] incubator-apex-core git commit: APEX-35 #resolve Attempt to create directory before opening the meta file to write

APEX-35 #resolve Attempt to create directory before opening the meta file to write


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45c7685a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45c7685a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45c7685a

Branch: refs/heads/master
Commit: 45c7685a0ca7a6cebcebaf0f3ab8b89788adc4b2
Parents: 19d6658
Author: David Yan <da...@datatorrent.com>
Authored: Thu Aug 6 13:31:05 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Thu Aug 6 17:51:52 2015 -0700

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 48 +++++++++-----------
 1 file changed, 21 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45c7685a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 0847f3c..6840288 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -19,6 +19,7 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
+import com.datatorrent.netlet.util.DTThrowable;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -173,6 +175,7 @@ public class StreamingContainerManager implements PlanContext
   private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
   private long lastLatencyWarningTime;
   private transient ExecutorService poolExecutor;
+  private FileContext fileContext;
 
   //logic operator name to a queue of logical metrics. this gets cleared periodically
   private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap();
@@ -329,22 +332,8 @@ public class StreamingContainerManager implements PlanContext
       this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
     }
     this.plan = new PhysicalPlan(dag, this);
-    setupWsClient();
-    setupRecording(enableEventRecording);
-    setupStringCodecs();
     this.journal = new Journal(this);
-    try {
-      saveMetaInfo();
-    } catch (IOException ex) {
-      LOG.error("Error saving meta info to DFS", ex);
-    }
-
-    try {
-      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), new FsPermission((short)0644));
-      this.containerFile.append(getAppMasterContainerInfo());
-    } catch (IOException ex) {
-      LOG.warn("Caught exception when instantiating for container info file. Ignoring", ex);
-    }
+    init(enableEventRecording);
   }
 
   private StreamingContainerManager(CheckpointState checkpointedState, boolean enableEventRecording)
@@ -354,20 +343,26 @@ public class StreamingContainerManager implements PlanContext
     poolExecutor = Executors.newFixedThreadPool(4);
     this.plan = checkpointedState.physicalPlan;
     this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
+    this.journal = new Journal(this);
+    init(enableEventRecording);
+  }
+
+  private void init(boolean enableEventRecording)
+  {
     setupWsClient();
     setupRecording(enableEventRecording);
     setupStringCodecs();
-    this.journal = new Journal(this);
+
     try {
+      Path file = new Path(this.vars.appPath);
+      URI uri = file.toUri();
+      Configuration config = new YarnConfiguration();
+      fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
       saveMetaInfo();
-    } catch (IOException ex) {
-      LOG.error("Error saving meta info to DFS", ex);
-    }
-    try {
-      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), new FsPermission((short) 0644));
+      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), FsPermission.getDefault());
       this.containerFile.append(getAppMasterContainerInfo());
     } catch (IOException ex) {
-      LOG.error("Caught exception when instantiating for container info file", ex);
+      DTThrowable.rethrow(ex);
     }
   }
 
@@ -858,9 +853,8 @@ public class StreamingContainerManager implements PlanContext
    */
   private void saveMetaInfo() throws IOException
   {
-    Path path = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
-    FileContext fc = FileContext.getFileContext(path.toUri());
-    try (FSDataOutputStream os = fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
+    Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
+    try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {
       JSONObject top = new JSONObject();
       JSONObject attributes = new JSONObject();
       for (Map.Entry<Attribute<?>, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
@@ -877,7 +871,7 @@ public class StreamingContainerManager implements PlanContext
       throw new RuntimeException(ex);
     }
     Path origPath = new Path(this.vars.appPath, APP_META_FILENAME);
-    fc.rename(path, origPath, Options.Rename.OVERWRITE);
+    fileContext.rename(file, origPath, Options.Rename.OVERWRITE);
   }
 
   public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName)
@@ -1416,7 +1410,7 @@ public class StreamingContainerManager implements PlanContext
         try {
           FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId());
           if (operatorFile == null) {
-            operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), new FsPermission((short)0644)));
+            operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), FsPermission.getDefault()));
             operatorFile = operatorFiles.get(ptOp.getId());
           }
           JSONObject operatorInfo = new JSONObject();