You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/04/10 22:27:00 UTC
[1/2] apex-core git commit: APEXCORE-575 Improve application restart
time.
Repository: apex-core
Updated Branches:
refs/heads/master 412a3bd81 -> 88bf33627
APEXCORE-575 Improve application restart time.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8825f5fa
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8825f5fa
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8825f5fa
Branch: refs/heads/master
Commit: 8825f5fa3e22beaf360f111f37ec0c4dba24ad1c
Parents: 9054fd2
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Feb 27 17:23:20 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Tue Apr 4 01:03:21 2017 +0530
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 14 +-
.../datatorrent/common/util/FSStorageAgent.java | 11 +
.../apex/common/util/AsyncStorageAgent.java | 54 +++++
.../apex/common/util/CascadeStorageAgent.java | 202 +++++++++++++++++++
.../common/util/CascadeStorageAgentTest.java | 116 +++++++++++
.../java/com/datatorrent/stram/StramClient.java | 12 +-
.../stram/StreamingContainerManager.java | 49 +++--
.../java/com/datatorrent/stram/engine/Node.java | 17 +-
.../stram/plan/physical/PhysicalPlan.java | 9 +-
.../datatorrent/stram/StramRecoveryTest.java | 45 ++++-
10 files changed, 493 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 24d850e..0c389a4 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.AsyncStorageAgent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -45,7 +46,7 @@ import com.google.common.base.Throwables;
*
* @since 3.1.0
*/
-public class AsyncFSStorageAgent extends FSStorageAgent
+public class AsyncFSStorageAgent extends FSStorageAgent implements AsyncStorageAgent
{
private final transient Configuration conf;
private transient volatile String localBasePath;
@@ -146,6 +147,16 @@ public class AsyncFSStorageAgent extends FSStorageAgent
}
@Override
+ public void finalize(int operatorId, long windowId) throws IOException
+ {
+ // Checkpoint already present in HDFS during save, when syncCheckpoint is true.
+ if (isSyncCheckpoint()) {
+ return;
+ }
+ copyToHDFS(operatorId, windowId);
+ }
+
+ @Override
public Object readResolve() throws ObjectStreamException
{
AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null);
@@ -153,6 +164,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
return asyncFSStorageAgent;
}
+ @Override
public boolean isSyncCheckpoint()
{
return syncCheckpoint;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index fe90b86..b5a43fe 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.common.util;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
@@ -153,6 +154,16 @@ public class FSStorageAgent implements StorageAgent, Serializable
public long[] getWindowIds(int operatorId) throws IOException
{
Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId));
+ try {
+ FileStatus status = fileContext.getFileStatus(lPath);
+ if (!status.isDirectory()) {
+ throw new IOException("Checkpoint location is not a directory ");
+ }
+ } catch (FileNotFoundException ex) {
+ // During initialization this directory may not exists.
+ // return an empty array.
+ return new long[0];
+ }
RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
if (!fileStatusRemoteIterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
new file mode 100644
index 0000000..632a7f2
--- /dev/null
+++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.common.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * Storage agent which can take checkpoints asynchronously.
+ * An AsyncStorageAgent enables quick checkpoints by taking local snapshot of an operator
+ * and unblocking the operator to process more data, while storage engine is pushing local snapshot to
+ * the distributed or globally accessible location for recovery.
+ */
+@InterfaceStability.Evolving
+public interface AsyncStorageAgent extends StorageAgent
+{
+ /**
+ * Make checkpoint for given windowID final. i.e after this method returns,
+ * the checkpoint is accessible for recovery.
+ *
+ * @param operatorId
+ * @param windowId
+ * @throws IOException
+ */
+ public void finalize(int operatorId, long windowId) throws IOException;
+
+ /**
+ * Check if StorageAgent is configured to take synchronous checkpoints.
+ *
+ * @return true if StorageAgent is configured to take synchronous checkpoints.
+ * @return false otherwise.
+ */
+ public boolean isSyncCheckpoint();
+
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
new file mode 100644
index 0000000..d6fec8e
--- /dev/null
+++ b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.common.util;
+
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store
+ * the checkpoint, and use the parent agent to read old checkpoints. For application having
+ * large number of physical operators, the size and number of files to be copied could be
+ * large impacting application restart time. This storage-agent is used during application
+ * restart to avoiding copying checkpoints from old application directory to improve application
+ * restart time.
+ */
+public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
+{
+ private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
+ private final StorageAgent parent;
+ private final StorageAgent current;
+ private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
+
+ public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
+ {
+ this.parent = parent;
+ this.current = current;
+ oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
+ }
+
+ /**
+ * does the checkpoint belong to parent
+ */
+ private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException
+ {
+ long[] wids = getParentWindowIds(operatorId);
+ if (wids.length != 0) {
+ return (wid <= wids[wids.length - 1]);
+ }
+ return false;
+ }
+
+ /**
+ * Return window-id of checkpoints available in old storage agent. This function
+ * will call getWindowIds of old storage agent only once for the fist time, and
+ * return cached data for next calls for same operator.
+ *
+ * @param operatorId
+ * @return
+ * @throws IOException
+ */
+ private long[] getParentWindowIds(int operatorId) throws IOException
+ {
+ long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId);
+ if (oldWindowIds == null) {
+ oldWindowIds = parent.getWindowIds(operatorId);
+ if (oldWindowIds == null) {
+ oldWindowIds = new long[0];
+ }
+ Arrays.sort(oldWindowIds);
+ oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds);
+ logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}", operatorId, Arrays.toString(oldWindowIds));
+ }
+ return oldWindowIds;
+ }
+
+ /**
+ * Save object in current storage agent. This should not modify old storage agent
+ * in any way.
+ *
+ * @param object - The operator whose state needs to be saved.
+ * @param operatorId - Identifier of the operator.
+ * @param windowId - Identifier for the specific state of the operator.
+ * @throws IOException
+ */
+ @Override
+ public void save(Object object, int operatorId, long windowId) throws IOException
+ {
+ current.save(object, operatorId, windowId);
+ }
+
+ /**
+ * Delete old checkpoints from the storage agent.
+ *
+ * The checkpoints are deleted from current directory if it is present in current
+ * storage agent. and cached state for old storage agent is removed.
+ *
+ * @param operatorId
+ * @param windowId
+ * @throws IOException
+ */
+ @Override
+ public void delete(int operatorId, long windowId) throws IOException
+ {
+ if (!isCheckpointFromParent(operatorId, windowId)) {
+ current.delete(operatorId, windowId);
+ }
+ }
+
+ /**
+ * Load checkpoint from storage agents. Do a basic comparision of windowIds
+ * to check the storage agent which has the checkpoint.
+ *
+ * @param operatorId Id for which the object was previously saved
+ * @param windowId WindowId for which the object was previously saved
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public Object load(int operatorId, long windowId) throws IOException
+ {
+ long[] oldWindowIds = getParentWindowIds(operatorId);
+ if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length - 1]) {
+ return parent.load(operatorId, windowId);
+ }
+ return current.load(operatorId, windowId);
+ }
+
+ @Override
+ public long[] getWindowIds(int operatorId) throws IOException
+ {
+ long[] currentIds = current.getWindowIds(operatorId);
+ long[] oldWindowIds = getParentWindowIds(operatorId);
+ return merge(currentIds, oldWindowIds);
+ }
+
+ private static final long[] EMPTY_LONG_ARRAY = new long[0];
+ private long[] merge(long[] currentIds, long[] oldWindowIds)
+ {
+ if (currentIds == null && oldWindowIds == null) {
+ return EMPTY_LONG_ARRAY;
+ }
+ if (currentIds == null) {
+ return oldWindowIds;
+ }
+ if (oldWindowIds == null) {
+ return currentIds;
+ }
+ long[] mergedArray = new long[currentIds.length + oldWindowIds.length];
+ System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length);
+ System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length);
+ Arrays.sort(mergedArray);
+ return mergedArray;
+ }
+
+ @Override
+ public void finalize(int operatorId, long windowId) throws IOException
+ {
+ if (current instanceof AsyncStorageAgent) {
+ ((AsyncStorageAgent)current).finalize(operatorId, windowId);
+ }
+ }
+
+ @Override
+ public boolean isSyncCheckpoint()
+ {
+ if (parent instanceof AsyncStorageAgent) {
+ return ((AsyncStorageAgent)parent).isSyncCheckpoint();
+ }
+ return true;
+ }
+
+ public Object readResolve() throws ObjectStreamException
+ {
+ return new CascadeStorageAgent(parent, current);
+ }
+
+ public StorageAgent getCurrentStorageAgent()
+ {
+ return current;
+ }
+
+ public StorageAgent getParentStorageAgent()
+ {
+ return parent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
new file mode 100644
index 0000000..40f24f0
--- /dev/null
+++ b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.common.util.CascadeStorageAgent;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
+public class CascadeStorageAgentTest
+{
+
+ static class TestMeta extends TestWatcher
+ {
+ String applicationPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ try {
+ FileUtils.forceMkdir(new File("target/" + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ try {
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSingleIndirection() throws IOException
+ {
+ String oldAppPath = testMeta.applicationPath;
+ FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null);
+ storageAgent.save("1", 1, 1);
+ storageAgent.save("2", 1, 2);
+ storageAgent.save("3", 2, 1);
+
+ String newAppPath = oldAppPath + ".new";
+ CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
+ long[] operatorIds = cascade.getWindowIds(1);
+ Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L});
+
+ operatorIds = cascade.getWindowIds(2);
+ Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L});
+
+ /* save should happen to new location */
+ cascade.save("4", 1, 4);
+ FileContext fileContext = FileContext.getFileContext();
+ Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 4)));
+ Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4)));
+
+ // check for delete,
+ // delete for old checkpoint should be ignored
+ cascade.save("5", 1, 5);
+ cascade.delete(1, 2L);
+ Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 2)));
+ cascade.delete(1, 4L);
+ Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4)));
+
+ /* chaining of storage agent */
+ String latestAppPath = oldAppPath + ".latest";
+ cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
+ CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath, null));
+ operatorIds = latest.getWindowIds(1);
+ Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5});
+
+ latest.save("6", 1, 6);
+ Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 6)));
+ Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 6)));
+ Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new Path(latestAppPath + "/" + 1 + "/" + 6)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index dad42e3..b280aad 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -257,6 +257,7 @@ public class StramClient
public void copyInitialState(Path origAppDir) throws IOException
{
// locate previous snapshot
+ long copyStart = System.currentTimeMillis();
String newAppDir = this.dag.assertAppPath();
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(origAppDir.toString(), conf);
@@ -284,6 +285,7 @@ public class StramClient
logOs.close();
logIs.close();
+ List<String> excludeDirs = Arrays.asList(LogicalPlan.SUBDIR_CHECKPOINTS, LogicalPlan.SUBDIR_EVENTS, LogicalPlan.SUBDIR_STATS);
// copy sub directories that are not present in target
FileStatus[] lFiles = fs.listStatus(origAppDir);
@@ -298,19 +300,19 @@ public class StramClient
String newAppDirPath = Path.getPathWithoutSchemeAndAuthority(new Path(newAppDir)).toString();
for (FileStatus f : lFiles) {
- if (f.isDirectory()) {
+ if (f.isDirectory() && !excludeDirs.contains(f.getPath().getName())) {
String targetPath = f.getPath().toString().replace(origAppDirPath, newAppDirPath);
if (!fs.exists(new Path(targetPath))) {
- LOG.debug("Copying {} to {}", f.getPath(), targetPath);
+ LOG.debug("Copying {} size {} to {}", f.getPath(), f.getLen(), targetPath);
+ long start = System.currentTimeMillis();
FileUtil.copy(fs, f.getPath(), fs, new Path(targetPath), false, conf);
- //FSUtil.copy(fs, f, fs, new Path(targetPath), false, false, conf);
+ LOG.debug("Copying {} to {} took {} ms", f.getPath(), f.getLen(), targetPath, System.currentTimeMillis() - start);
} else {
LOG.debug("Ignoring {} as it already exists under {}", f.getPath(), targetPath);
- //FSUtil.setPermission(fs, new Path(targetPath), new FsPermission((short)0777));
}
}
}
-
+ LOG.info("Copying initial state took {} ms", System.currentTimeMillis() - copyStart);
}
/**
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index c68df14..51e85f7 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,6 +65,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.CascadeStorageAgent;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
import org.apache.commons.io.IOUtils;
@@ -3238,23 +3239,43 @@ public class StreamingContainerManager implements PlanContext
this.finals = new FinalVars(finals, lp);
StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT);
- if (sa instanceof AsyncFSStorageAgent) {
- // replace the default storage agent, if present
- AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
- if (fssa.path.contains(oldAppId)) {
- fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
- lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
- }
- } else if (sa instanceof FSStorageAgent) {
- // replace the default storage agent, if present
- FSStorageAgent fssa = (FSStorageAgent)sa;
- if (fssa.path.contains(oldAppId)) {
- fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
- lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
- }
+ lp.setAttribute(OperatorContext.STORAGE_AGENT, updateStorageAgent(sa, oldAppId, appId, conf));
+ }
+ }
+
+ private static StorageAgent updateStorageAgent(StorageAgent sa, String oldAppId, String appId, Configuration conf)
+ {
+ if (sa instanceof AsyncFSStorageAgent || sa instanceof FSStorageAgent) {
+ FSStorageAgent newAgent = (FSStorageAgent)updateFSStorageAgent(sa, oldAppId, appId, conf);
+ if (newAgent != sa) {
+ return new CascadeStorageAgent(sa, newAgent);
}
+ } else if (sa instanceof CascadeStorageAgent) {
+ CascadeStorageAgent csa = (CascadeStorageAgent)sa;
+ StorageAgent currentStorageAgent = csa.getCurrentStorageAgent();
+ return new CascadeStorageAgent(csa, updateFSStorageAgent(currentStorageAgent, oldAppId, appId, conf));
}
+ return sa;
+ }
+ /**
+ * Return updated FileSystem based storage agent. Storage agent is updated only when
+ * they use application directory to store the checkpoints.
+ */
+ private static StorageAgent updateFSStorageAgent(StorageAgent sa, String oldAppId, String appId, Configuration conf)
+ {
+ if (sa instanceof AsyncFSStorageAgent) {
+ AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
+ if (fssa.path.contains(oldAppId)) {
+ return new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+ }
+ } else if (sa instanceof FSStorageAgent) {
+ FSStorageAgent fssa = (FSStorageAgent)sa;
+ if (fssa.path.contains(oldAppId)) {
+ return new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+ }
+ }
+ return sa;
}
public interface RecoveryHandler
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index d779afe..c84a249 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.AsyncStorageAgent;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Throwables;
@@ -70,7 +71,6 @@ import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.OperatorDeployInfo;
@@ -519,16 +519,16 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
checkpointStats = new Stats.CheckpointStats();
checkpointStats.checkpointStartTime = System.currentTimeMillis();
ba.save(operator, id, windowId);
- if (ba instanceof AsyncFSStorageAgent) {
- AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)ba;
- if (!asyncFSStorageAgent.isSyncCheckpoint()) {
+ if (ba instanceof AsyncStorageAgent) {
+ AsyncStorageAgent asyncStorageAgent = (AsyncStorageAgent)ba;
+ if (!asyncStorageAgent.isSyncCheckpoint()) {
if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo();
checkpointWindowInfo.windowId = windowId;
checkpointWindowInfo.applicationWindowCount = applicationWindowCount;
checkpointWindowInfo.checkpointWindowCount = checkpointWindowCount;
CheckpointHandler checkpointHandler = new CheckpointHandler();
- checkpointHandler.agent = asyncFSStorageAgent;
+ checkpointHandler.agent = asyncStorageAgent;
checkpointHandler.operatorId = id;
checkpointHandler.windowId = windowId;
checkpointHandler.stats = checkpointStats;
@@ -539,7 +539,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
checkpointStats = null;
return;
} else {
- asyncFSStorageAgent.copyToHDFS(id, windowId);
+ asyncStorageAgent.finalize(id, windowId);
}
}
}
@@ -680,8 +680,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
private class CheckpointHandler implements Callable<Stats.CheckpointStats>
{
-
- public AsyncFSStorageAgent agent;
+ public AsyncStorageAgent agent;
public int operatorId;
public long windowId;
public Stats.CheckpointStats stats;
@@ -689,7 +688,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
@Override
public Stats.CheckpointStats call() throws Exception
{
- agent.copyToHDFS(id, windowId);
+ agent.finalize(id, windowId);
stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
return stats;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..7547654 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.AsyncStorageAgent;
import org.apache.commons.lang.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -68,7 +69,6 @@ import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.Journal.Recoverable;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StramEvent;
@@ -1226,11 +1226,8 @@ public class PhysicalPlan implements Serializable
long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId;
StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
agent.save(oo, oper.id, windowId);
- if (agent instanceof AsyncFSStorageAgent) {
- AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent;
- if (!asyncFSStorageAgent.isSyncCheckpoint()) {
- asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
- }
+ if (agent instanceof AsyncStorageAgent) {
+ ((AsyncStorageAgent)agent).finalize(oper.id, windowId);
}
} catch (IOException e) {
// inconsistent state, no recovery option, requires shutdown
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 645598d..2f46049 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -44,6 +44,7 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.CascadeStorageAgent;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
@@ -428,6 +429,7 @@ public class StramRecoveryTest
o1p1.getContainer().setExternalId("cid1");
scm.writeJournal(o1p1.getContainer().getSetContainerState());
+ /* simulate application restart from app1 */
dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath2);
dag.setAttribute(LogicalPlan.APPLICATION_ID, appId2);
@@ -447,9 +449,50 @@ public class StramRecoveryTest
o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0);
assertEquals("journal copied", "cid1", o1p1.getContainer().getExternalId());
- ids = new FSStorageAgent(appPath2 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId());
+ CascadeStorageAgent csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+ Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class);
+ Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(), agent.getClass());
+ Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(), agent.getClass());
+ /* parent and current points to expected location */
+ Assert.assertEquals(true, ((FSStorageAgent)csa.getParentStorageAgent()).path.contains("app1"));
+ Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app2"));
+
+ ids = csa.getWindowIds(o1p1.getId());
Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids);
+
+ /* simulate another application restart from app2 */
+ String appId3 = "app3";
+ String appPath3 = testMeta.getPath() + "/" + appId3;
+ dag = new LogicalPlan();
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath3);
+ dag.setAttribute(LogicalPlan.APPLICATION_ID, appId3);
+ sc = new StramClient(new Configuration(), dag);
+ try {
+ sc.start();
+ sc.copyInitialState(new Path(appPath2)); // copy state from app2.
+ } finally {
+ sc.stop();
+ }
+ scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false);
+ plan = scm.getPhysicalPlan();
+ dag = plan.getLogicalPlan();
+
+ csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+ Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class);
+ Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(), agent.getClass());
+ Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(), CascadeStorageAgent.class);
+
+ CascadeStorageAgent parent = (CascadeStorageAgent)csa.getParentStorageAgent();
+ Assert.assertEquals("current storage agent is of same type ", parent.getCurrentStorageAgent().getClass(), agent.getClass());
+ Assert.assertEquals("parent storage agent is cascade ", parent.getParentStorageAgent().getClass(), agent.getClass());
+ /* verify paths */
+ Assert.assertEquals(true, ((FSStorageAgent)parent.getParentStorageAgent()).path.contains("app1"));
+ Assert.assertEquals(true, ((FSStorageAgent)parent.getCurrentStorageAgent()).path.contains("app2"));
+ Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app3"));
+
+ ids = csa.getWindowIds(o1p1.getId());
+ Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids);
}
@Test
[2/2] apex-core git commit: Merge branch 'restart_optimizations' of
github.com:tushargosavi/apex-core
Posted by pr...@apache.org.
Merge branch 'restart_optimizations' of github.com:tushargosavi/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/88bf3362
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/88bf3362
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/88bf3362
Branch: refs/heads/master
Commit: 88bf336271d5c9c375ea9054b4c4f17b0e277102
Parents: 412a3bd 8825f5f
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Apr 10 14:11:22 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Apr 10 14:11:22 2017 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 14 +-
.../datatorrent/common/util/FSStorageAgent.java | 11 +
.../apex/common/util/AsyncStorageAgent.java | 54 +++++
.../apex/common/util/CascadeStorageAgent.java | 202 +++++++++++++++++++
.../common/util/CascadeStorageAgentTest.java | 116 +++++++++++
.../java/com/datatorrent/stram/StramClient.java | 12 +-
.../stram/StreamingContainerManager.java | 49 +++--
.../java/com/datatorrent/stram/engine/Node.java | 17 +-
.../stram/plan/physical/PhysicalPlan.java | 9 +-
.../datatorrent/stram/StramRecoveryTest.java | 45 ++++-
10 files changed, 493 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/88bf3362/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------