You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:45 UTC
[14/50] incubator-apex-core git commit: APEX-35 #resolve Attempt to
create directory before opening the meta file to write
APEX-35 #resolve Attempt to create directory before opening the meta file to write
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45c7685a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45c7685a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45c7685a
Branch: refs/heads/master
Commit: 45c7685a0ca7a6cebcebaf0f3ab8b89788adc4b2
Parents: 19d6658
Author: David Yan <da...@datatorrent.com>
Authored: Thu Aug 6 13:31:05 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Thu Aug 6 17:51:52 2015 -0700
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 48 +++++++++-----------
1 file changed, 21 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45c7685a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 0847f3c..6840288 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -19,6 +19,7 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import com.datatorrent.netlet.util.DTThrowable;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
@@ -173,6 +175,7 @@ public class StreamingContainerManager implements PlanContext
private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
private long lastLatencyWarningTime;
private transient ExecutorService poolExecutor;
+ private FileContext fileContext;
//logic operator name to a queue of logical metrics. this gets cleared periodically
private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap();
@@ -329,22 +332,8 @@ public class StreamingContainerManager implements PlanContext
this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
}
this.plan = new PhysicalPlan(dag, this);
- setupWsClient();
- setupRecording(enableEventRecording);
- setupStringCodecs();
this.journal = new Journal(this);
- try {
- saveMetaInfo();
- } catch (IOException ex) {
- LOG.error("Error saving meta info to DFS", ex);
- }
-
- try {
- this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), new FsPermission((short)0644));
- this.containerFile.append(getAppMasterContainerInfo());
- } catch (IOException ex) {
- LOG.warn("Caught exception when instantiating for container info file. Ignoring", ex);
- }
+ init(enableEventRecording);
}
private StreamingContainerManager(CheckpointState checkpointedState, boolean enableEventRecording)
@@ -354,20 +343,26 @@ public class StreamingContainerManager implements PlanContext
poolExecutor = Executors.newFixedThreadPool(4);
this.plan = checkpointedState.physicalPlan;
this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
+ this.journal = new Journal(this);
+ init(enableEventRecording);
+ }
+
+ private void init(boolean enableEventRecording)
+ {
setupWsClient();
setupRecording(enableEventRecording);
setupStringCodecs();
- this.journal = new Journal(this);
+
try {
+ Path file = new Path(this.vars.appPath);
+ URI uri = file.toUri();
+ Configuration config = new YarnConfiguration();
+ fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
saveMetaInfo();
- } catch (IOException ex) {
- LOG.error("Error saving meta info to DFS", ex);
- }
- try {
- this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), new FsPermission((short) 0644));
+ this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), FsPermission.getDefault());
this.containerFile.append(getAppMasterContainerInfo());
} catch (IOException ex) {
- LOG.error("Caught exception when instantiating for container info file", ex);
+ DTThrowable.rethrow(ex);
}
}
@@ -858,9 +853,8 @@ public class StreamingContainerManager implements PlanContext
*/
private void saveMetaInfo() throws IOException
{
- Path path = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
- FileContext fc = FileContext.getFileContext(path.toUri());
- try (FSDataOutputStream os = fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
+ Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
+ try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {
JSONObject top = new JSONObject();
JSONObject attributes = new JSONObject();
for (Map.Entry<Attribute<?>, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
@@ -877,7 +871,7 @@ public class StreamingContainerManager implements PlanContext
throw new RuntimeException(ex);
}
Path origPath = new Path(this.vars.appPath, APP_META_FILENAME);
- fc.rename(path, origPath, Options.Rename.OVERWRITE);
+ fileContext.rename(file, origPath, Options.Rename.OVERWRITE);
}
public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName)
@@ -1416,7 +1410,7 @@ public class StreamingContainerManager implements PlanContext
try {
FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId());
if (operatorFile == null) {
- operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), new FsPermission((short)0644)));
+ operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), FsPermission.getDefault()));
operatorFile = operatorFiles.get(ptOp.getId());
}
JSONObject operatorInfo = new JSONObject();