You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:37 UTC
[06/50] incubator-apex-core git commit: writing checkpoints async
writing checkpoints async
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/29eb6c37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/29eb6c37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/29eb6c37
Branch: refs/heads/master
Commit: 29eb6c377e92242c540d9a4d8be43a1fe05b7ac2
Parents: 66a75e0
Author: Gaurav <ga...@datatorrent.com>
Authored: Thu Jul 30 11:15:24 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Tue Aug 4 16:28:17 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 111 ++++++++++++++++
.../datatorrent/common/util/FSStorageAgent.java | 3 +-
.../common/codec/JsonStreamCodecTest.java | 15 ++-
.../common/util/AsyncFSStorageAgentTest.java | 133 +++++++++++++++++++
.../java/com/datatorrent/stram/StramClient.java | 5 +-
.../datatorrent/stram/StramLocalCluster.java | 4 +-
.../stram/StreamingAppMasterService.java | 2 +-
.../stram/StreamingContainerManager.java | 10 +-
.../java/com/datatorrent/stram/engine/Node.java | 64 ++++++++-
.../stram/plan/physical/PhysicalPlan.java | 8 +-
.../com/datatorrent/stram/CheckpointTest.java | 11 +-
.../stram/LogicalPlanModificationTest.java | 22 ++-
.../com/datatorrent/stram/PartitioningTest.java | 26 +++-
.../stram/StramLocalClusterTest.java | 22 ++-
.../datatorrent/stram/StramMiniClusterTest.java | 9 +-
.../datatorrent/stram/StramRecoveryTest.java | 56 ++++++--
.../stram/StreamingContainerManagerTest.java | 45 ++++++-
.../stram/debug/TupleRecorderTest.java | 3 +
.../stram/engine/AutoMetricTest.java | 2 +
.../stram/engine/InputOperatorTest.java | 5 +-
.../stram/engine/ProcessingModeTests.java | 9 ++
.../datatorrent/stram/engine/SliderTest.java | 5 +
.../com/datatorrent/stram/engine/StatsTest.java | 10 +-
.../stram/engine/WindowGeneratorTest.java | 11 +-
.../stram/webapp/StramWebServicesTest.java | 6 +-
25 files changed, 527 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
new file mode 100644
index 0000000..d5de61c
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectStreamException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncFSStorageAgent extends FSStorageAgent
+{
+ private final transient FileSystem fs;
+ private final transient Configuration conf;
+ private final String localBasePath;
+
+ private boolean syncCheckpoint = false;
+
+ private AsyncFSStorageAgent()
+ {
+ super();
+ fs = null;
+ conf = null;
+ localBasePath = null;
+ }
+
+ public AsyncFSStorageAgent(String path, Configuration conf)
+ {
+ this(".", path, conf);
+ }
+
+ public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
+ {
+ super(path, conf);
+ if (localBasePath == null) {
+ this.localBasePath = "/tmp";
+ }
+ else {
+ this.localBasePath = localBasePath;
+ }
+ logger.debug("Initialize storage agent with {}.", this.localBasePath);
+ this.conf = conf == null ? new Configuration() : conf;
+ try {
+ fs = FileSystem.newInstance(this.conf);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void save(final Object object, final int operatorId, final long windowId) throws IOException
+ {
+ String operatorIdStr = String.valueOf(operatorId);
+ File directory = new File(localBasePath, operatorIdStr);
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+ try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(windowId)))) {
+ store(stream, object);
+ }
+ }
+
+ public void copyToHDFS(final int operatorId, final long windowId) throws IOException
+ {
+ String operatorIdStr = String.valueOf(operatorId);
+ File directory = new File(localBasePath, operatorIdStr);
+ String window = Long.toHexString(windowId);
+ Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + System.currentTimeMillis() + TMP_FILE);
+ FileUtil.copy(new File(directory, String.valueOf(windowId)), fs, lPath, true, conf);
+ fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE);
+ }
+
+ @Override
+ public Object readResolve() throws ObjectStreamException
+ {
+ return new AsyncFSStorageAgent(this.localBasePath, this.path, null);
+ }
+
+ public boolean isSyncCheckpoint()
+ {
+ return syncCheckpoint;
+ }
+
+ public void setSyncCheckpoint(boolean syncCheckpoint)
+ {
+ this.syncCheckpoint = syncCheckpoint;
+ }
+
+ private static final long serialVersionUID = 201507241610L;
+ private static final Logger logger = LoggerFactory.getLogger(AsyncFSStorageAgent.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index 31b537d..14275fa 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -52,8 +52,7 @@ public class FSStorageAgent implements StorageAgent, Serializable
kryo = new Kryo();
}
- @SuppressWarnings("unused")
- private FSStorageAgent()
+ protected FSStorageAgent()
{
path = null;
fileContext = null;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
index e0a5f01..a9303bc 100644
--- a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
+++ b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
@@ -1,14 +1,17 @@
/**
* Copyright (C) 2015 DataTorrent, Inc.
*
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package com.datatorrent.common.codec;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
new file mode 100644
index 0000000..e7f9f66
--- /dev/null
+++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
+public class AsyncFSStorageAgentTest
+{
+ private static class TestMeta extends TestWatcher
+ {
+ String applicationPath;
+ String basePath;
+ AsyncFSStorageAgent storageAgent;
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ basePath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ applicationPath = basePath + "/app";
+ try {
+ FileUtils.forceMkdir(new File(basePath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ storageAgent = new AsyncFSStorageAgent(basePath, applicationPath, null);
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ try {
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSave() throws IOException
+ {
+ Map<Integer, String> data = Maps.newHashMap();
+ data.put(1, "one");
+ data.put(2, "two");
+ data.put(3, "three");
+ testMeta.storageAgent.save(data, 1, 1);
+ testMeta.storageAgent.copyToHDFS(1, 1);
+ @SuppressWarnings("unchecked")
+ Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+ Assert.assertEquals("dataOf1", data, decoded);
+ }
+
+ @Test
+ public void testLoad() throws IOException
+ {
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ testMeta.storageAgent.save(dataOf1, 1, 1);
+ testMeta.storageAgent.copyToHDFS(1, 1);
+ testMeta.storageAgent.save(dataOf2, 2, 1);
+ testMeta.storageAgent.copyToHDFS(2, 1);
+ @SuppressWarnings("unchecked")
+ Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+
+ @SuppressWarnings("unchecked")
+ Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1);
+ Assert.assertEquals("data of 1", dataOf1, decoded1);
+ Assert.assertEquals("data of 2", dataOf2, decoded2);
+ }
+
+ @Test
+ public void testRecovery() throws IOException
+ {
+ testSave();
+ testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.basePath, testMeta.applicationPath, null);
+ testSave();
+ }
+
+ @Test
+ public void testDelete() throws IOException
+ {
+ testLoad();
+ testMeta.storageAgent.delete(1, 1);
+ Path appPath = new Path(testMeta.applicationPath);
+ FileContext fileContext = FileContext.getFileContext();
+ Assert.assertTrue("operator 2 window 1", fileContext.util().exists(new Path(appPath + "/" + 2 + "/" + 1)));
+ Assert.assertFalse("operator 1 window 1", fileContext.util().exists(new Path(appPath + "/" + 1 + "/" + 1)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 657f678..dfb4511 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.DTLoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
-
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BasicContainerOptConfigurator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.client.StramClientUtils;
@@ -456,8 +456,9 @@ public class StramClient
if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */
Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS);
// use conf client side to pickup any proxy settings from dt-site.xml
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(checkpointPath.toString(), conf));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointPath.toString(), conf));
}
+
if(dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR) == null){
dag.setAttribute(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR,new BasicContainerOptConfigurator());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index c7ac0cb..e28c097 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.net.NetUtils;
+import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode.Controller;
import com.datatorrent.api.Operator;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
@@ -298,7 +300,7 @@ public class StramLocalCluster implements Runnable, Controller
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri);
}
if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
}
this.dnmgr = new StreamingContainerManager(dag);
this.umbilical = new UmbilicalProtocolLocalImpl();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index dbb3d11..5246c9e 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 95f4648..0847f3c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -76,6 +76,7 @@ import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.bufferserver.auth.AuthManager;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.common.util.NumberAggregate;
import com.datatorrent.common.util.Pair;
@@ -2949,7 +2950,14 @@ public class StreamingContainerManager implements PlanContext
this.finals = new FinalVars(finals, lp);
StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT);
- if (sa instanceof FSStorageAgent) {
+ if(sa instanceof AsyncFSStorageAgent){
+ // replace the default storage agent, if present
+ AsyncFSStorageAgent fssa = (AsyncFSStorageAgent) sa;
+ if (fssa.path.contains(oldAppId)) {
+ fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+ lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
+ }
+ } else if (sa instanceof FSStorageAgent) {
// replace the default storage agent, if present
FSStorageAgent fssa = (FSStorageAgent) sa;
if (fssa.path.contains(oldAppId)) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 24679dc..ea33970 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -27,8 +27,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
@@ -46,6 +45,9 @@ import com.datatorrent.api.Operator.Unifier;
import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.bufferserver.util.Codec;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
@@ -99,12 +101,16 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
public final BlockingQueue<StatsListener.OperatorResponse> commandResponse;
private final List<Field> metricFields;
private final Map<String, Method> metricMethods;
+ private ExecutorService executorService;
+ private Queue<Pair<FutureTask<Stats.CheckpointStats>, Long>> taskQueue;
protected Stats.CheckpointStats checkpointStats;
public Node(OPERATOR operator, OperatorContext context)
{
this.operator = operator;
this.context = context;
+ executorService = Executors.newSingleThreadExecutor();
+ taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, Long>>();
outputs = new HashMap<String, Sink<Object>>();
@@ -173,6 +179,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
pcpair.component.teardown();
}
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
operator.teardown();
}
@@ -405,6 +414,21 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
checkpointStats = null;
checkpoint = null;
}
+ else {
+ Pair<FutureTask<Stats.CheckpointStats>, Long> pair = taskQueue.peek();
+ if (pair != null && pair.getFirst().isDone()) {
+ taskQueue.poll();
+ try {
+ stats.checkpointStats = pair.getFirst().get();
+ stats.checkpoint = new Checkpoint(pair.getSecond(), applicationWindowCount, checkpointWindowCount);
+ if (operator instanceof Operator.CheckpointListener) {
+ ((Operator.CheckpointListener) operator).checkpointed(pair.getSecond());
+ }
+ } catch (Exception ex) {
+ throw DTThrowable.wrapIfChecked(ex);
+ }
+ }
+ }
context.report(stats, windowId);
}
@@ -440,6 +464,25 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
checkpointStats = new Stats.CheckpointStats();
checkpointStats.checkpointStartTime = System.currentTimeMillis();
ba.save(operator, id, windowId);
+ if (ba instanceof AsyncFSStorageAgent) {
+ AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba;
+ if (!asyncFSStorageAgent.isSyncCheckpoint() && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+ CheckpointHandler checkpointHandler = new CheckpointHandler();
+ checkpointHandler.agent = asyncFSStorageAgent;
+ checkpointHandler.operatorId = id;
+ checkpointHandler.windowId = windowId;
+ checkpointHandler.stats = checkpointStats;
+ FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
+ taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
+ executorService.submit(futureTask);
+ checkpoint = null;
+ checkpointStats = null;
+ return;
+ }
+ else {
+ asyncFSStorageAgent.copyToHDFS(id, windowId);
+ }
+ }
checkpointStats.checkpointTime = System.currentTimeMillis() - checkpointStats.checkpointStartTime;
}
catch (IOException ie) {
@@ -570,5 +613,22 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
deactivateSinks();
}
+ private class CheckpointHandler implements Callable<Stats.CheckpointStats>
+ {
+
+ public AsyncFSStorageAgent agent;
+ public int operatorId;
+ public long windowId;
+ public Stats.CheckpointStats stats;
+
+ @Override
+ public Stats.CheckpointStats call() throws Exception
+ {
+ agent.copyToHDFS(id, windowId);
+ stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
+ return stats;
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(Node.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 7c0432d..5b90c04 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -40,6 +40,8 @@ import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.api.Partitioner.PartitionKeys;
import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.Journal.Recoverable;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StramEvent;
@@ -941,7 +943,11 @@ public class PhysicalPlan implements Serializable
try {
LOG.debug("Writing activation checkpoint {} {} {}", checkpoint, oper, oo);
long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId;
- oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oo, oper.id, windowId);
+ StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
+ agent.save(oo, oper.id, windowId);
+ if (agent instanceof AsyncFSStorageAgent) {
+ ((AsyncFSStorageAgent) agent).copyToHDFS(oper.id, windowId);
+ }
} catch (IOException e) {
// inconsistent state, no recovery option, requires shutdown
throw new IllegalStateException("Failed to write operator state after partition change " + oper, e);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index dd804ec..4072894 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -37,6 +37,7 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.MockContainer.MockOperatorStats;
import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
import com.datatorrent.stram.api.Checkpoint;
@@ -111,6 +112,9 @@ public class CheckpointTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+ AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir + "/locaPath", testMeta.dir, null);
+ storageAgent.setSyncCheckpoint(true);
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
dag.setAttribute(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -127,14 +131,13 @@ public class CheckpointTest
sc.setHeartbeatMonitoringEnabled(false);
sc.run();
- StorageAgent fssa = sc.getDAG().getValue(OperatorContext.STORAGE_AGENT);
StreamingContainerManager dnm = sc.dnmgr;
PhysicalPlan plan = dnm.getPhysicalPlan();
Assert.assertEquals("number required containers", 1, dnm.getPhysicalPlan().getContainers().size());
PTOperator o1p1 = plan.getOperators(dag.getMeta(o1)).get(0);
Set<Long> checkpoints = Sets.newHashSet();
- for (long windowId : fssa.getWindowIds(o1p1.getId())) {
+ for (long windowId : storageAgent.getWindowIds(o1p1.getId())) {
checkpoints.add(windowId);
}
Assert.assertEquals("number checkpoints " + checkpoints, 3, checkpoints.size());
@@ -142,7 +145,7 @@ public class CheckpointTest
PTOperator o2p1 = plan.getOperators(dag.getMeta(o2)).get(0);
checkpoints = Sets.newHashSet();
- for (long windowId : fssa.getWindowIds(o2p1.getId())) {
+ for (long windowId : storageAgent.getWindowIds(o2p1.getId())) {
checkpoints.add(windowId);
}
Assert.assertEquals("number checkpoints " + checkpoints, 1, checkpoints.size());
@@ -152,7 +155,7 @@ public class CheckpointTest
Assert.assertNotNull("checkpoint not null for statefull operator " + o1p1, o1p1.stats.checkpointStats);
for (Checkpoint cp : o1p1.checkpoints) {
- Object load = fssa.load(o1p1.getId(), cp.windowId);
+ Object load = storageAgent.load(o1p1.getId(), cp.windowId);
Assert.assertEquals("Stored Operator and Saved State", load.getClass(), o1p1.getOperatorMeta().getOperator().getClass());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index db1d9ec..78a1bd8 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -27,9 +27,10 @@ import org.junit.Rule;
import org.junit.Test;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
-import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
@@ -291,15 +292,14 @@ public class LogicalPlanModificationTest
}
- @Test
- public void testExecutionManager() throws Exception {
+ private void testExecutionManager(StorageAgent agent) throws Exception {
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
StreamingContainerManager dnm = new StreamingContainerManager(dag);
- Assert.assertEquals(""+dnm.containerStartRequests, dnm.containerStartRequests.size(), 0);
+ Assert.assertEquals("" + dnm.containerStartRequests, dnm.containerStartRequests.size(), 0);
CreateOperatorRequest cor = new CreateOperatorRequest();
@@ -331,4 +331,16 @@ public class LogicalPlanModificationTest
}
+ @Test
+ public void testExecutionManagerWithSyncStorageAgent() throws Exception
+ {
+ testExecutionManager(new FSStorageAgent(testMeta.dir, null));
+ }
+
+ @Test
+ public void testExecutionManagerWithAsyncStorageAgent() throws Exception
+ {
+ testExecutionManager(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 9c169ee..15ad76e 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -27,13 +27,13 @@ import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.common.collect.Sets;
+
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.Node;
@@ -150,6 +150,8 @@ public class PartitioningTest
public void testDefaultPartitioning() throws Exception
{
LogicalPlan dag = new LogicalPlan();
+ File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning");
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
Integer[][] testData = {
{4, 5}
@@ -249,6 +251,9 @@ public class PartitioningTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
+ File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning");
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+
CollectorOperator.receivedTuples.clear();
TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
@@ -391,12 +396,12 @@ public class PartitioningTest
*
* @throws Exception
*/
- @Test
- public void testInputOperatorPartitioning() throws Exception
+
+ private void testInputOperatorPartitioning(LogicalPlan dag) throws Exception
{
File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning");
- LogicalPlan dag = new LogicalPlan();
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath());
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator());
dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
@@ -418,7 +423,10 @@ public class PartitioningTest
Checkpoint checkpoint = new Checkpoint(10L, 0, 0);
p.checkpoints.add(checkpoint);
p.setRecoveryCheckpoint(checkpoint);
- new FSStorageAgent(checkpointDir.getPath(), null).save(inputDeployed, p.getId(), 10L);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null);
+ agent.save(inputDeployed, p.getId(), 10L);
+ agent.copyToHDFS(p.getId(), 10l);
+
}
Assert.assertEquals("", Sets.newHashSet("partition_0", "partition_1", "partition_2"), partProperties);
@@ -447,6 +455,12 @@ public class PartitioningTest
}
+ @Test
+ public void testInputOperatorPartitioningWithAsyncStorageAgent() throws Exception
+ {
+ LogicalPlan dag = new LogicalPlan();
+ testInputOperatorPartitioning(dag);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 8489c70..1881566 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -15,7 +15,6 @@
*/
package com.datatorrent.stram;
-import com.datatorrent.stram.api.Checkpoint;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -23,22 +22,17 @@ import java.io.LineNumberReader;
import java.util.Arrays;
import java.util.Map;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
import com.datatorrent.stram.StramLocalCluster.MockComponentFactory;
-import com.datatorrent.stram.engine.GenericTestOperator;
-import com.datatorrent.stram.engine.Node;
-import com.datatorrent.stram.engine.OperatorContext;
-import com.datatorrent.stram.engine.TestGeneratorInputOperator;
-import com.datatorrent.stram.engine.TestOutputOperator;
-import com.datatorrent.stram.engine.WindowGenerator;
+import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.engine.*;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.ManualScheduledExecutorService;
@@ -75,6 +69,7 @@ public class StramLocalClusterTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class);
genNode.setMaxTuples(2);
@@ -114,6 +109,9 @@ public class StramLocalClusterTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+ agent.setSyncCheckpoint(true);
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent);
TestGeneratorInputOperator node1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
// data will be added externally from test
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 4d0cd37..99478f5 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -48,6 +48,7 @@ import com.sun.jersey.api.client.WebResource;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.client.StramClientUtils.YarnClientHelper;
import com.datatorrent.stram.engine.GenericTestOperator;
@@ -202,6 +203,9 @@ public class StramMiniClusterTest
LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf);
tb.addFromProperties(dagProps, null);
LogicalPlan dag = createDAG(tb);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+ agent.setSyncCheckpoint(true);
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
Configuration yarnConf = new Configuration(yarnCluster.getConfig());
StramClient client = new StramClient(yarnConf, dag);
try {
@@ -357,7 +361,10 @@ public class StramMiniClusterTest
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + "/" + testMeta.dir);
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+ agent.setSyncCheckpoint(true);
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 8515734..6172d8a 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -46,6 +46,7 @@ import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
@@ -70,8 +71,7 @@ public class StramRecoveryTest
private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class);
@Rule public final TestMeta testMeta = new TestMeta();
- @Test
- public void testPhysicalPlanSerialization() throws Exception
+ private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception
{
LogicalPlan dag = new LogicalPlan();
@@ -86,7 +86,7 @@ public class StramRecoveryTest
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
TestPlanContext ctx = new TestPlanContext();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -121,6 +121,18 @@ public class StramRecoveryTest
}
+ @Test
+ public void testPhysicalPlanSerializationWithSyncAgent() throws Exception
+ {
+ testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+ }
+
+ @Test
+ public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception
+ {
+ testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ }
+
public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener
{
int processStatsCnt = 0;
@@ -144,14 +156,13 @@ public class StramRecoveryTest
* Test serialization of the container manager with mock execution layer.
* @throws Exception
*/
- @Test
- public void testContainerManager() throws Exception
+ private void testContainerManager(StorageAgent agent) throws Exception
{
FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
StatsListeningOperator o1 = dag.addOperator("o1", StatsListeningOperator.class);
@@ -254,6 +265,18 @@ public class StramRecoveryTest
}
@Test
+ public void testContainerManagerWithSyncAgent() throws Exception
+ {
+ testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+ }
+
+ @Test
+ public void testContainerManagerWithAsyncAgent() throws Exception
+ {
+ testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ }
+
+ @Test
public void testWriteAheadLog() throws Exception
{
final MutableInt flushCount = new MutableInt();
@@ -358,19 +381,17 @@ public class StramRecoveryTest
scm.setPhysicalOperatorProperty(o1p1.getId(), "maxTuples", "50");
}
- @Test
- public void testRestartApp() throws Exception
+ private void testRestartApp(StorageAgent agent, String appPath1) throws Exception
{
FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
String appId1 = "app1";
String appId2 = "app2";
- String appPath1 = testMeta.dir + "/" + appId1;
String appPath2 = testMeta.dir + "/" + appId2;
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1);
dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
dag.addOperator("o1", StatsListeningOperator.class);
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false));
@@ -419,6 +440,21 @@ public class StramRecoveryTest
}
@Test
+ public void testRestartAppWithSyncAgent() throws Exception
+ {
+ String appPath1 = testMeta.dir + "/app1";
+ testRestartApp(new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
+ }
+
+ @Test
+ public void testRestartAppWithAsyncAgent() throws Exception
+ {
+ String appPath1 = testMeta.dir + "/app1";
+ String checkpointPath = testMeta.dir + "/localPath";
+ testRestartApp(new AsyncFSStorageAgent(checkpointPath, appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
+ }
+
+ @Test
public void testRpcFailover() throws Exception
{
String appPath = testMeta.dir;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index c1567b8..ba15a78 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
@@ -41,6 +42,7 @@ import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
@@ -56,12 +58,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerSt
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
-import com.datatorrent.stram.engine.DefaultUnifier;
-import com.datatorrent.stram.engine.GenericTestOperator;
-import com.datatorrent.stram.engine.TestAppDataQueryOperator;
-import com.datatorrent.stram.engine.TestAppDataResultOperator;
-import com.datatorrent.stram.engine.TestAppDataSourceOperator;
-import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.engine.*;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -471,6 +468,37 @@ public class StreamingContainerManagerTest {
}
@Test
+ public void testAsyncCheckpointWindowIds() throws Exception
+ {
+ File path = new File(testMeta.dir);
+ FileUtils.deleteDirectory(path.getAbsoluteFile());
+ FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath"));
+
+ AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath() + "/localPath", path.getPath(), null);
+
+ long[] windowIds = new long[]{123L, 345L, 234L};
+ for (long windowId : windowIds) {
+ sa.save(windowId, 1, windowId);
+ sa.copyToHDFS(1, windowId);
+ }
+
+ Arrays.sort(windowIds);
+ long[] windowsIds = sa.getWindowIds(1);
+ Arrays.sort(windowsIds);
+ Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
+
+ for (long windowId : windowIds) {
+ sa.delete(1, windowId);
+ }
+ try {
+ sa.getWindowIds(1);
+ Assert.fail("There should not be any most recently saved windowId!");
+ } catch (IOException io) {
+ Assert.assertTrue("No State Saved", true);
+ }
+ }
+
+ @Test
public void testProcessHeartbeat() throws Exception
{
FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
@@ -712,6 +740,8 @@ public class StreamingContainerManagerTest {
@Test
public void testPhysicalPropertyUpdate() throws Exception{
LogicalPlan dag = new LogicalPlan();
+ String workingDir = new File("target/testPhysicalPropertyUpdate").getAbsolutePath();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -735,7 +765,6 @@ public class StreamingContainerManagerTest {
Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass)
{
LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
TestAppDataQueryOperator q = dag.addOperator("q", qClass);
TestAppDataResultOperator r = dag.addOperator("r", rClass);
@@ -755,6 +784,8 @@ public class StreamingContainerManagerTest {
private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
{
+ String workingDir = new File("target/testAppDataSources").getAbsolutePath();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
StreamingContainerManager dnmgr = lc.dnmgr;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 5f68c6a..718bf1b 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.debug.TupleRecorder.PortInfo;
@@ -210,6 +212,7 @@ public class TupleRecorderTest
public void testRecordingFlow() throws Exception
{
LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath() + "/localPath", testWorkDir.getAbsolutePath(), null));
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath());
dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024); // 1KB per part
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index a95956e..752adeb 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -41,6 +41,7 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.engine.AutoMetricTest.TestOperator.TestStatsListener;
import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -183,6 +184,7 @@ public class AutoMetricTest
public void testMetricPropagation() throws Exception
{
LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 83bd61f..142f45f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -20,6 +20,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.WaitCondition;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
@@ -28,13 +29,13 @@ import com.datatorrent.api.Operator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.netlet.util.CircularBuffer;
+import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
-
import org.junit.Test;
/**
@@ -124,6 +125,8 @@ public class InputOperatorTest
public void testSomeMethod() throws Exception
{
LogicalPlan dag = new LogicalPlan();
+ String testWorkDir = new File("target").getAbsolutePath();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir + "/localBasePath", testWorkDir, null));
EvenOddIntegerGeneratorInputOperator generator = dag.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class);
final CollectorModule<Number> collector = dag.addOperator("NumberCollector", new CollectorModule<Number>());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index bada257..0393394 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -15,7 +15,10 @@
*/
package com.datatorrent.stram.engine;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
+
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -75,6 +78,8 @@ public class ProcessingModeTests
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
+ String workingDir = new File("target/testLinearInputOperatorRecovery").getAbsolutePath();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
rip.setMaximumTuples(maxTuples);
rip.setSimulateFailure(true);
@@ -97,6 +102,8 @@ public class ProcessingModeTests
CollectorOperator.duplicates.clear();
LogicalPlan dag = new LogicalPlan();
+ String workingDir = new File("target/testLinearOperatorRecovery").getAbsolutePath();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -121,6 +128,8 @@ public class ProcessingModeTests
CollectorOperator.duplicates.clear();
LogicalPlan dag = new LogicalPlan();
+ String workingDir = new File("target/testLinearInlineOperatorsRecovery").getAbsolutePath();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 754b150..26515d4 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -15,6 +15,9 @@
*/
package com.datatorrent.stram.engine;
+import java.io.File;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import org.junit.Assert;
import org.junit.Test;
@@ -133,6 +136,8 @@ public class SliderTest
private void test(int applicationWindowCount, int slideByWindowCount) throws Exception
{
LogicalPlan dag = new LogicalPlan();
+ String workingDir = new File("target/sliderTest").getAbsolutePath();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100);
Input input = dag.addOperator("Input", new Input());
Sum sum = dag.addOperator("Sum", new Sum());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 0019f56..0ededd4 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -15,6 +15,7 @@
*/
package com.datatorrent.stram.engine;
+import java.io.File;
import java.io.Serializable;
import java.util.*;
@@ -23,12 +24,14 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.Stats.OperatorStats.PortStats;
import com.datatorrent.api.StatsListener;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.engine.StatsTest.TestCollector.TestCollectorStatsListener;
@@ -170,7 +173,8 @@ public class StatsTest
{
int tupleCount = 10;
LogicalPlan dag = new LogicalPlan();
-
+ String workingDir = new File("target").getAbsolutePath();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
TestInputStatsListener testInputStatsListener = new TestInputStatsListener();
dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener}));
@@ -225,6 +229,8 @@ public class StatsTest
private void baseTestForQueueSize(int maxTuples, TestCollectorStatsListener statsListener, DAG.Locality locality) throws Exception
{
LogicalPlan dag = new LogicalPlan();
+ String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200);
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
testOper.setMaxTuples(maxTuples);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index a6897e0..4f7b842 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -15,6 +15,7 @@
*/
package com.datatorrent.stram.engine;
+import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -24,12 +25,11 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
@@ -305,7 +305,8 @@ public class WindowGeneratorTest
{
logger.info("Testing Out of Sequence Error");
LogicalPlan dag = new LogicalPlan();
-
+ String workingDir = new File("target/testOutofSequenceError").getAbsolutePath();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator());
MyLogger ml = dag.addOperator("logger", new MyLogger());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 49c7844..9b8f0b2 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -56,6 +56,8 @@ import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -125,7 +127,9 @@ public class StramWebServicesTest extends JerseyTest
protected void configureServlets()
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File("target", StramWebServicesTest.class.getName()).getAbsolutePath());
+ String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath();
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir);
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag);
appContext = new TestAppContext();