You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/03/29 01:36:56 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-2031 enable
WindowDataManager to save window files under user configured path
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master c5cab8bd5 -> c3f6951a5
APEXMALHAR-2031 enable WindowDataManager to save window files under user configured path
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dec419fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dec419fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dec419fc
Branch: refs/heads/master
Commit: dec419fc2ddbd4a82598a255c25cdf774d0c0efc
Parents: c5cab8b
Author: Chandni Singh <cs...@apache.org>
Authored: Mon Mar 28 15:57:19 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Mar 28 16:14:59 2016 -0700
----------------------------------------------------------------------
.../datatorrent/lib/util/WindowDataManager.java | 35 ++-
.../lib/util/FSWindowDataManagerTest.java | 216 +++++++++++++++++++
.../com/datatorrent/lib/util/TestUtils.java | 22 +-
.../lib/util/WindowDataManagerTest.java | 200 -----------------
4 files changed, 267 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
index 7517cd4..9930d7e 100644
--- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -113,6 +113,8 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
@NotNull
private String recoveryPath;
+ private boolean isRecoveryPathRelativeToAppPath = true;
+
/**
* largest window for which there is recovery data across all physical operator instances.
*/
@@ -144,7 +146,11 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
public void setup(Context.OperatorContext context)
{
Configuration configuration = new Configuration();
- appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+ if (isRecoveryPathRelativeToAppPath) {
+ appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+ } else {
+ appPath = new Path(recoveryPath);
+ }
try {
storageAgent = new FSStorageAgent(appPath.toString(), configuration);
@@ -333,15 +339,42 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
}
}
+ /**
+ * @return recovery path
+ */
public String getRecoveryPath()
{
return recoveryPath;
}
+ /**
+ * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative
+ * to the application path; otherwise it is handled as an absolute path.
+ *
+ * @param recoveryPath recovery path
+ */
public void setRecoveryPath(String recoveryPath)
{
this.recoveryPath = recoveryPath;
}
+
+ /**
+ * @return true if recovery path is relative to app path; false otherwise.
+ */
+ public boolean isRecoveryPathRelativeToAppPath()
+ {
+ return isRecoveryPathRelativeToAppPath;
+ }
+
+ /**
+ * Specifies whether the recovery path is relative to application path.
+ *
+ * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise.
+ */
+ public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath)
+ {
+ isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
new file mode 100644
index 0000000..26996e7
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+/**
+ * Tests for {@link WindowDataManager}
+ */
+public class FSWindowDataManagerTest
+{
+ private static class TestMeta extends TestWatcher
+ {
+
+ String applicationPath;
+ WindowDataManager.FSWindowDataManager storageManager;
+ Context.OperatorContext context;
+
+ @Override
+ protected void starting(Description description)
+ {
+ TestUtils.deleteTargetTestClassFolder(description);
+ super.starting(description);
+ storageManager = new WindowDataManager.FSWindowDataManager();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ TestUtils.deleteTargetTestClassFolder(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testLargestRecoveryWindow()
+ {
+ testMeta.storageManager.setup(testMeta.context);
+ Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
+ testMeta.storageManager.teardown();
+ }
+
+ @Test
+ public void testSave() throws IOException
+ {
+ testMeta.storageManager.setup(testMeta.context);
+ Map<Integer, String> data = Maps.newHashMap();
+ data.put(1, "one");
+ data.put(2, "two");
+ data.put(3, "three");
+ testMeta.storageManager.save(data, 1, 1);
+ testMeta.storageManager.setup(testMeta.context);
+ @SuppressWarnings("unchecked")
+ Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
+ Assert.assertEquals("dataOf1", data, decoded);
+ testMeta.storageManager.teardown();
+ }
+
+ @Test
+ public void testLoad() throws IOException
+ {
+ testMeta.storageManager.setup(testMeta.context);
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ testMeta.storageManager.save(dataOf1, 1, 1);
+ testMeta.storageManager.save(dataOf2, 2, 1);
+ testMeta.storageManager.setup(testMeta.context);
+ Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
+ Assert.assertEquals("no of states", 2, decodedStates.size());
+ for (Integer operatorId : decodedStates.keySet()) {
+ if (operatorId == 1) {
+ Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
+ } else {
+ Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
+ }
+ }
+ testMeta.storageManager.teardown();
+ }
+
+ @Test
+ public void testRecovery() throws IOException
+ {
+ testMeta.storageManager.setup(testMeta.context);
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ testMeta.storageManager.save(dataOf1, 1, 1);
+ testMeta.storageManager.save(dataOf2, 2, 2);
+
+ testMeta.storageManager.setup(testMeta.context);
+ Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
+ testMeta.storageManager.teardown();
+ }
+
+ @Test
+ public void testDelete() throws IOException
+ {
+ testMeta.storageManager.setup(testMeta.context);
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ Map<Integer, String> dataOf3 = Maps.newHashMap();
+ dataOf2.put(7, "seven");
+ dataOf2.put(8, "eight");
+ dataOf2.put(9, "nine");
+
+ for (int i = 1; i <= 9; ++i) {
+ testMeta.storageManager.save(dataOf1, 1, i);
+ }
+
+ testMeta.storageManager.save(dataOf2, 2, 1);
+ testMeta.storageManager.save(dataOf3, 3, 1);
+
+ testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
+ Sets.newHashSet(2, 3));
+ testMeta.storageManager.setup(testMeta.context);
+ testMeta.storageManager.deleteUpTo(1, 6);
+
+ Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
+ FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+ Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+ TreeSet<String> windows = Sets.newTreeSet();
+ for (FileStatus fileStatus : fileStatuses) {
+ windows.add(fileStatus.getPath().getName());
+ }
+ Assert.assertEquals("window list for 1", Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows);
+ Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
+ Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
+ testMeta.storageManager.teardown();
+ }
+
+ @Test
+ public void testAbsoluteRecoveryPath() throws IOException
+ {
+ testMeta.storageManager.setRecoveryPathRelativeToAppPath(false);
+ long time = System.currentTimeMillis();
+ testMeta.storageManager.setRecoveryPath("target/" + time);
+ testSave();
+ File recoveryDir = new File("target/" + time);
+ Assert.assertTrue("recover path exist", recoveryDir.isDirectory());
+ FileUtils.deleteDirectory(recoveryDir);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 37d55e7..37aa7e7 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -18,18 +18,21 @@
*/
package com.datatorrent.lib.util;
-import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.util.List;
import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.io.FileUtils;
-import com.datatorrent.api.*;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
public class TestUtils
{
@@ -51,6 +54,15 @@ public class TestUtils
}
}
+ public static void deleteTargetTestClassFolder(Description description)
+ {
+ try {
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink)
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
deleted file mode 100644
index 845b992..0000000
--- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeSet;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-
-/**
- * Tests for {@link WindowDataManager}
- */
-public class WindowDataManagerTest
-{
- private static class TestMeta extends TestWatcher
- {
-
- String applicationPath;
- WindowDataManager.FSWindowDataManager storageManager;
- Context.OperatorContext context;
-
- @Override
- protected void starting(Description description)
- {
- super.starting(description);
- storageManager = new WindowDataManager.FSWindowDataManager();
- applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
-
- Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
- attributes.put(DAG.APPLICATION_PATH, applicationPath);
- context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
-
- storageManager.setup(context);
- }
-
- @Override
- protected void finished(Description description)
- {
- storageManager.teardown();
- try {
- FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Rule
- public TestMeta testMeta = new TestMeta();
-
- @Test
- public void testLargestRecoveryWindow()
- {
- Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
- }
-
- @Test
- public void testSave() throws IOException
- {
- Map<Integer, String> data = Maps.newHashMap();
- data.put(1, "one");
- data.put(2, "two");
- data.put(3, "three");
- testMeta.storageManager.save(data, 1, 1);
- testMeta.storageManager.setup(testMeta.context);
- @SuppressWarnings("unchecked")
- Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
- Assert.assertEquals("dataOf1", data, decoded);
- }
-
- @Test
- public void testLoad() throws IOException
- {
- Map<Integer, String> dataOf1 = Maps.newHashMap();
- dataOf1.put(1, "one");
- dataOf1.put(2, "two");
- dataOf1.put(3, "three");
-
- Map<Integer, String> dataOf2 = Maps.newHashMap();
- dataOf2.put(4, "four");
- dataOf2.put(5, "five");
- dataOf2.put(6, "six");
-
- testMeta.storageManager.save(dataOf1, 1, 1);
- testMeta.storageManager.save(dataOf2, 2, 1);
- testMeta.storageManager.setup(testMeta.context);
- Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
- Assert.assertEquals("no of states", 2, decodedStates.size());
- for (Integer operatorId : decodedStates.keySet()) {
- if (operatorId == 1) {
- Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
- } else {
- Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
- }
- }
- }
-
- @Test
- public void testRecovery() throws IOException
- {
- Map<Integer, String> dataOf1 = Maps.newHashMap();
- dataOf1.put(1, "one");
- dataOf1.put(2, "two");
- dataOf1.put(3, "three");
-
- Map<Integer, String> dataOf2 = Maps.newHashMap();
- dataOf2.put(4, "four");
- dataOf2.put(5, "five");
- dataOf2.put(6, "six");
-
- testMeta.storageManager.save(dataOf1, 1, 1);
- testMeta.storageManager.save(dataOf2, 2, 2);
-
- testMeta.storageManager.setup(testMeta.context);
- Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
- }
-
- @Test
- public void testDelete() throws IOException
- {
- Map<Integer, String> dataOf1 = Maps.newHashMap();
- dataOf1.put(1, "one");
- dataOf1.put(2, "two");
- dataOf1.put(3, "three");
-
- Map<Integer, String> dataOf2 = Maps.newHashMap();
- dataOf2.put(4, "four");
- dataOf2.put(5, "five");
- dataOf2.put(6, "six");
-
- Map<Integer, String> dataOf3 = Maps.newHashMap();
- dataOf2.put(7, "seven");
- dataOf2.put(8, "eight");
- dataOf2.put(9, "nine");
-
- for (int i = 1; i <= 9; ++i) {
- testMeta.storageManager.save(dataOf1, 1, i);
- }
-
- testMeta.storageManager.save(dataOf2, 2, 1);
- testMeta.storageManager.save(dataOf3, 3, 1);
-
- testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
- Sets.newHashSet(2, 3));
- testMeta.storageManager.setup(testMeta.context);
- testMeta.storageManager.deleteUpTo(1, 6);
-
- Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
- FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
- FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
- Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
- TreeSet<String> windows = Sets.newTreeSet();
- for (FileStatus fileStatus : fileStatuses) {
- windows.add(fileStatus.getPath().getName());
- }
- Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
- Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
- Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
- }
-
-}
[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2031'
Posted by ti...@apache.org.
Merge branch 'APEXMALHAR-2031'
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c3f6951a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c3f6951a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c3f6951a
Branch: refs/heads/master
Commit: c3f6951a5466f822debbd5cd700ed72f4f48155c
Parents: c5cab8b dec419f
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Mar 28 16:36:36 2016 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Mon Mar 28 16:36:36 2016 -0700
----------------------------------------------------------------------
.../datatorrent/lib/util/WindowDataManager.java | 35 ++-
.../lib/util/FSWindowDataManagerTest.java | 216 +++++++++++++++++++
.../com/datatorrent/lib/util/TestUtils.java | 22 +-
.../lib/util/WindowDataManagerTest.java | 200 -----------------
4 files changed, 267 insertions(+), 206 deletions(-)
----------------------------------------------------------------------