You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:05 UTC
[34/50] incubator-apex-core git commit: APEX-37 Added
APPLICATION_ATTEMPT_ID attribute and change operator and container history
file so that different application attempts write to separate HDFS files
APEX-37 Added APPLICATION_ATTEMPT_ID attribute and change operator and container history file so that different application attempts write to separate HDFS files
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b8c0b4cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b8c0b4cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b8c0b4cc
Branch: refs/heads/master
Commit: b8c0b4cc06b1d3b4a94c9027da0cb44da83f4c28
Parents: 9d08532
Author: David Yan <da...@datatorrent.com>
Authored: Thu Aug 6 18:35:32 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Aug 18 13:45:14 2015 -0700
----------------------------------------------------------------------
.../stram/StreamingAppMasterService.java | 2 +-
.../stram/StreamingContainerManager.java | 66 +++++++++++---------
.../stram/plan/logical/LogicalPlan.java | 5 ++
.../datatorrent/stram/util/FSJsonLineFile.java | 24 ++-----
.../datatorrent/stram/StramRecoveryTest.java | 2 +-
5 files changed, 46 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 5246c9e..98c78de 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -511,7 +511,7 @@ public class StreamingAppMasterService extends CompositeService
if (dag.isDebug()) {
dumpOutDebugInfo();
}
-
+ dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), conf);
this.dnmgr = StreamingContainerManager.getInstance(recoveryHandler, dag, true);
dag = this.dnmgr.getLogicalPlan();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 64850f5..6e0f3f5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -127,6 +127,8 @@ public class StreamingContainerManager implements PlanContext
private final static Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
public final static String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
public final static String BUILTIN_APPDATA_URL = "builtin";
+ public final static String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
+ public final static String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
public final static String APP_META_FILENAME = "meta.json";
public final static String APP_META_KEY_ATTRIBUTES = "attributes";
public final static String APP_META_KEY_METRICS = "metrics";
@@ -205,7 +207,7 @@ public class StreamingContainerManager implements PlanContext
};
private FSJsonLineFile containerFile;
- private final ConcurrentMap<Integer, FSJsonLineFile> operatorFiles = Maps.newConcurrentMap();
+ private FSJsonLineFile operatorFile;
private final long startTime = System.currentTimeMillis();
@@ -359,8 +361,11 @@ public class StreamingContainerManager implements PlanContext
Configuration config = new YarnConfiguration();
fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
saveMetaInfo();
- this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), FsPermission.getDefault());
+ String fileName = String.format(CONTAINERS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
+ this.containerFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
this.containerFile.append(getAppMasterContainerInfo());
+ fileName = String.format(OPERATORS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
+ this.operatorFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
} catch (IOException ex) {
throw DTThrowable.wrapIfChecked(ex);
}
@@ -490,9 +495,7 @@ public class StreamingContainerManager implements PlanContext
}
IOUtils.closeQuietly(containerFile);
- for (FSJsonLineFile operatorFile : operatorFiles.values()) {
- IOUtils.closeQuietly(operatorFile);
- }
+ IOUtils.closeQuietly(operatorFile);
if(poolExecutor != null) {
poolExecutor.shutdown();
}
@@ -854,8 +857,7 @@ public class StreamingContainerManager implements PlanContext
private void saveMetaInfo() throws IOException
{
Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
- try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {
- JSONObject top = new JSONObject();
+ try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) { JSONObject top = new JSONObject();
JSONObject attributes = new JSONObject();
for (Map.Entry<Attribute<?>, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
attributes.put(entry.getKey().getSimpleName(), entry.getValue());
@@ -1379,7 +1381,7 @@ public class StreamingContainerManager implements PlanContext
{
long currentTimeMillis = clock.getTime();
- StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId());
+ final StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId());
if (sca == null || sca.container.getState() == PTContainer.State.KILLED) {
// could be orphaned container that was replaced and needs to terminate
LOG.error("Unknown container {}", heartbeat.getContainerId());
@@ -1395,34 +1397,35 @@ public class StreamingContainerManager implements PlanContext
sca.container.bufferServerAddress = InetSocketAddress.createUnresolved(heartbeat.bufferServerHost, heartbeat.bufferServerPort);
LOG.info("Container {} buffer server: {}", sca.container.getExternalId(), sca.container.bufferServerAddress);
}
- long containerStartTime = System.currentTimeMillis();
+ final long containerStartTime = System.currentTimeMillis();
sca.container.setState(PTContainer.State.ACTIVE);
sca.container.setStartedTime(containerStartTime);
sca.container.setFinishedTime(-1);
sca.jvmName = heartbeat.jvmName;
- try {
- containerFile.append(sca.getContainerInfo());
- }
- catch (IOException ex) {
- LOG.warn("Cannot write to container file");
- }
- for (PTOperator ptOp : sca.container.getOperators()) {
- try {
- FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId());
- if (operatorFile == null) {
- operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), FsPermission.getDefault()));
- operatorFile = operatorFiles.get(ptOp.getId());
+ poolExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ containerFile.append(sca.getContainerInfo());
+ } catch (IOException ex) {
+ LOG.warn("Cannot write to container file");
+ }
+ for (PTOperator ptOp : sca.container.getOperators()) {
+ try {
+ JSONObject operatorInfo = new JSONObject();
+ operatorInfo.put("name", ptOp.getName());
+ operatorInfo.put("id", ptOp.getId());
+ operatorInfo.put("container", sca.container.getExternalId());
+ operatorInfo.put("startTime", containerStartTime);
+ operatorFile.append(operatorInfo);
+ } catch (IOException | JSONException ex) {
+ LOG.warn("Cannot write to operator file: ", ex);
+ }
}
- JSONObject operatorInfo = new JSONObject();
- operatorInfo.put("name", ptOp.getName());
- operatorInfo.put("container", sca.container.getExternalId());
- operatorInfo.put("startTime", containerStartTime);
- operatorFile.append(operatorInfo);
- }
- catch (Exception ex) {
- LOG.warn("Cannot write to operator file: ", ex);
}
- }
+ });
}
if (heartbeat.restartRequested) {
@@ -2823,9 +2826,10 @@ public class StreamingContainerManager implements PlanContext
scm = new StreamingContainerManager(dag, enableEventRecording, new SystemClock());
}
else {
- scm = new StreamingContainerManager(checkpointedState, enableEventRecording);
// find better way to support final transient members
PhysicalPlan plan = checkpointedState.physicalPlan;
+ plan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, dag.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID));
+ scm = new StreamingContainerManager(checkpointedState, enableEventRecording);
for (Field f : plan.getClass().getDeclaredFields()) {
if (f.getType() == PlanContext.class) {
f.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index d140d17..2d088b8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -130,6 +130,11 @@ public class LogicalPlan implements Serializable, DAG
*/
public static Attribute<Integer> CONTAINERS_MAX_COUNT = new Attribute<Integer>(Integer.MAX_VALUE);
+ /**
+ * The application attempt ID from YARN
+ */
+ public static Attribute<Integer> APPLICATION_ATTEMPT_ID = new Attribute<>(1);
+
static {
Attribute.AttributeMap.AttributeInitializer.initialize(LogicalPlan.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java b/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
index 7935ce4..3b5a31e 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java
@@ -17,7 +17,8 @@ package com.datatorrent.stram.util;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
+import java.util.EnumSet;
+
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.codehaus.jackson.map.ObjectMapper;
@@ -32,29 +33,13 @@ import org.slf4j.LoggerFactory;
*/
public class FSJsonLineFile implements Closeable
{
- private final FileSystem fs;
private final ObjectMapper objectMapper;
private final FSDataOutputStream os;
private static final Logger LOG = LoggerFactory.getLogger(FSJsonLineFile.class);
- public FSJsonLineFile(Path path, FsPermission permission) throws IOException
+ public FSJsonLineFile(FileContext fileContext, Path path, FsPermission permission) throws IOException
{
- fs = FileSystem.newInstance(path.toUri(), new Configuration());
- FSDataOutputStream myos;
- if (fs.exists(path)) {
- try {
- // happens if not the first application attempt
- myos = fs.append(path);
- }
- catch (IOException ex) {
- LOG.warn("Caught exception (OK during unit test): {}", ex.getMessage());
- myos = FileSystem.create(fs, path, permission);
- }
- }
- else {
- myos = FileSystem.create(fs, path, permission);
- }
- os = myos;
+ this.os = fileContext.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.perms(permission));
this.objectMapper = (new JSONSerializationProvider()).getContext(null);
}
@@ -74,7 +59,6 @@ public class FSJsonLineFile implements Closeable
public void close() throws IOException
{
os.close();
- fs.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 6172d8a..ab2092a 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -391,6 +391,7 @@ public class StramRecoveryTest
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1);
dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1);
+ dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1);
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
dag.addOperator("o1", StatsListeningOperator.class);
@@ -408,7 +409,6 @@ public class StramRecoveryTest
PTOperator o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0);
long[] ids = new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId());
Assert.assertArrayEquals(new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids);
-
Assert.assertNull(o1p1.getContainer().getExternalId());
// trigger journal write
o1p1.getContainer().setExternalId("cid1");