You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/19 07:27:38 UTC
[1/6] incubator-apex-malhar git commit: Using iterator to remove
entries instead of directly accessing data structure to avoid concurrent
modification exceptions
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 0a3e00ce0 -> 9c557fca1
Using iterator to remove entries instead of directly accessing data structure to avoid concurrent modification exceptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c39f5655
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c39f5655
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c39f5655
Branch: refs/heads/devel-3
Commit: c39f56555831249ce144b29c2835bc68952c1689
Parents: a029c5f
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Feb 5 13:19:17 2016 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Fri Feb 12 11:35:01 2016 -0800
----------------------------------------------------------------------
.../lib/io/IdempotentStorageManager.java | 25 ++++++++---------
.../datatorrent/lib/util/WindowDataManager.java | 11 +++++---
.../lib/io/IdempotentStorageManagerTest.java | 28 ++++++++++++++------
.../lib/util/WindowDataManagerTest.java | 18 ++++++++++---
4 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index dae417d..4eac924 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -23,25 +23,23 @@ import java.util.*;
import javax.validation.constraints.NotNull;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultimap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-
import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
* An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. An idempotent agent
@@ -61,6 +59,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
{
/**
* Gets the largest window for which there is recovery data.
+ * @return Returns the window id
*/
long getLargestRecoveryWindow();
@@ -233,13 +232,14 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
{
//deleting the replay state
if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
- Iterator<Long> windowsIterator = replayState.keySet().iterator();
- while (windowsIterator.hasNext()) {
- long lwindow = windowsIterator.next();
+ Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+ long lwindow = windowEntry.getKey();
if (lwindow > windowId) {
break;
}
- for (Integer loperator : replayState.removeAll(lwindow)) {
+ for (Integer loperator : windowEntry.getValue()) {
if (deletedOperators.contains(loperator)) {
storageAgent.delete(loperator, lwindow);
@@ -255,6 +255,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
storageAgent.delete(loperator, lwindow);
}
}
+ iterator.remove();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
index 26a2e32..7517cd4 100644
--- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -61,6 +61,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
{
/**
* Gets the largest window for which there is recovery data.
+ * @return Returns the window id
*/
long getLargestRecoveryWindow();
@@ -232,13 +233,14 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
{
//deleting the replay state
if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
- Iterator<Long> windowsIterator = replayState.keySet().iterator();
- while (windowsIterator.hasNext()) {
- long lwindow = windowsIterator.next();
+ Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+ long lwindow = windowEntry.getKey();
if (lwindow > windowId) {
break;
}
- for (Integer loperator : replayState.removeAll(lwindow)) {
+ for (Integer loperator : windowEntry.getValue()) {
if (deletedOperators.contains(loperator)) {
storageAgent.delete(loperator, lwindow);
@@ -253,6 +255,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
storageAgent.delete(loperator, lwindow);
}
}
+ iterator.remove();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index 347dabf..4b29830 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -20,18 +20,22 @@ package com.datatorrent.lib.io;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
+import java.util.TreeSet;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -40,7 +44,6 @@ import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
/**
@@ -172,18 +175,27 @@ public class IdempotentStorageManagerTest
dataOf2.put(8, "eight");
dataOf2.put(9, "nine");
- testMeta.storageManager.save(dataOf1, 1, 1);
+ for (int i = 1; i <= 9; ++i) {
+ testMeta.storageManager.save(dataOf1, 1, i);
+ }
+
testMeta.storageManager.save(dataOf2, 2, 1);
testMeta.storageManager.save(dataOf3, 3, 1);
testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager),
Sets.newHashSet(2, 3));
testMeta.storageManager.setup(testMeta.context);
- testMeta.storageManager.deleteUpTo(1, 1);
+ testMeta.storageManager.deleteUpTo(1, 6);
Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.recoveryPath);
FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
- Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+ FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+ Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+ TreeSet<String> windows = Sets.newTreeSet();
+ for (FileStatus fileStatus : fileStatuses) {
+ windows.add(fileStatus.getPath().getName());
+ }
+ Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
index fdca73e..845b992 100644
--- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
@@ -20,7 +20,9 @@ package com.datatorrent.lib.util;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
+import java.util.TreeSet;
import org.junit.Assert;
import org.junit.Rule;
@@ -30,6 +32,7 @@ import org.junit.runner.Description;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -169,18 +172,27 @@ public class WindowDataManagerTest
dataOf2.put(8, "eight");
dataOf2.put(9, "nine");
- testMeta.storageManager.save(dataOf1, 1, 1);
+ for (int i = 1; i <= 9; ++i) {
+ testMeta.storageManager.save(dataOf1, 1, i);
+ }
+
testMeta.storageManager.save(dataOf2, 2, 1);
testMeta.storageManager.save(dataOf3, 3, 1);
testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
Sets.newHashSet(2, 3));
testMeta.storageManager.setup(testMeta.context);
- testMeta.storageManager.deleteUpTo(1, 1);
+ testMeta.storageManager.deleteUpTo(1, 6);
Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
- Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+ FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+ Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+ TreeSet<String> windows = Sets.newTreeSet();
+ for (FileStatus fileStatus : fileStatuses) {
+ windows.add(fileStatus.getPath().getName());
+ }
+ Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
}
[4/6] incubator-apex-malhar git commit: APEXMALHAR-1995 Upgrade Apex
Core dependency to 3.3.0
Posted by th...@apache.org.
APEXMALHAR-1995 Upgrade Apex Core dependency to 3.3.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5df199d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5df199d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5df199d7
Branch: refs/heads/devel-3
Commit: 5df199d79195641c51adf9b9ab7816b2660bd76f
Parents: 1eb554f
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sat Feb 13 22:05:24 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Feb 13 22:05:24 2016 -0800
----------------------------------------------------------------------
checkstyle-suppressions.xml | 27 +++++++++++++++++++++++++++
pom.xml | 7 ++++---
2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5df199d7/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle-suppressions.xml b/checkstyle-suppressions.xml
new file mode 100644
index 0000000..08763a3
--- /dev/null
+++ b/checkstyle-suppressions.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<suppressions>
+</suppressions>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5df199d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a5c2772..7d804f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.apex</groupId>
<artifactId>apex</artifactId>
- <version>3.2.0-incubating</version>
+ <version>3.3.0-incubating</version>
</parent>
<groupId>org.apache.apex</groupId>
@@ -49,8 +49,9 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.deploy.skip>false</maven.deploy.skip>
- <apex.core.version>3.2.0-incubating</apex.core.version>
+ <apex.core.version>3.3.0-incubating</apex.core.version>
<semver.plugin.skip>false</semver.plugin.skip>
+ <checkstyle.console>false</checkstyle.console>
</properties>
<build>
@@ -93,7 +94,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>8252</maxAllowedViolations>
+ <maxAllowedViolations>15967</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
[2/6] incubator-apex-malhar git commit: Fixes for the following issues
Posted by th...@apache.org.
Fixes for the following issues
Committed offsets are not present in offset manager storage.
Operator partitions are reporting offsets to stats listener for kafka partitions they don't subscribe to.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5775a539
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5775a539
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5775a539
Branch: refs/heads/devel-3
Commit: 5775a53904399b561335182bc822ecbc6f45f787
Parents: cec33da
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Feb 9 20:52:56 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Feb 12 11:58:38 2016 -0800
----------------------------------------------------------------------
.../kafka/AbstractKafkaInputOperator.java | 21 ++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5775a539/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 4b22e5e..b166b9e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -643,7 +643,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) {
p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
if (initOffsets != null) {
- p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+ //Don't send all offsets to all partitions
+ //p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+ p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets());
}
}
newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
@@ -715,7 +717,11 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
{
//In every partition check interval, call offsetmanager to update the offsets
if (offsetManager != null) {
- offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
+ Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats);
+ if (offsetsForPartitions.size() > 0) {
+ logger.debug("Passing offset updates to offset manager");
+ offsetManager.updateOffsets(offsetsForPartitions);
+ }
}
}
@@ -743,15 +749,18 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
long t = System.currentTimeMillis();
+ // If stats are available then update offsets
+ // Do this before re-partition interval check below to not miss offset updates
+ if (kstats.size() > 0) {
+ logger.debug("Checking offset updates for offset manager");
+ updateOffsets(kstats);
+ }
+
if (t - lastCheckTime < repartitionCheckInterval) {
// return false if it's within repartitionCheckInterval since last time it check the stats
return false;
}
- logger.debug("Use OffsetManager to update offsets");
- updateOffsets(kstats);
-
-
if(repartitionInterval < 0){
// if repartition is disabled
return false;
[5/6] incubator-apex-malhar git commit: Merge branch 'MLHR-1962' of
https://github.com/shubham-pathak22/incubator-apex-malhar
Posted by th...@apache.org.
Merge branch 'MLHR-1962' of https://github.com/shubham-pathak22/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1afb4a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1afb4a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1afb4a26
Branch: refs/heads/devel-3
Commit: 1afb4a26fa0f6447d54a1f21705c3dd9a4a4f489
Parents: 1eb554f 0a3e00c
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Tue Feb 16 12:30:25 2016 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Tue Feb 16 12:30:25 2016 +0530
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../datatorrent/contrib/parser/JsonParser.java | 247 +++++++++++
.../parser/JsonParserApplicationTest.java | 93 ++++
.../contrib/parser/JsonParserTest.java | 443 +++++++++++++++++++
.../src/test/resources/json-parser-schema.json | 51 +++
.../com/datatorrent/lib/parser/JsonParser.java | 110 -----
.../datatorrent/lib/parser/JsonParserTest.java | 228 ----------
7 files changed, 840 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
[6/6] incubator-apex-malhar git commit: Merge branch 'master' of
github.com:tweise/incubator-apex-malhar
Posted by th...@apache.org.
Merge branch 'master' of github.com:tweise/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9c557fca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9c557fca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9c557fca
Branch: refs/heads/devel-3
Commit: 9c557fca11b9dcdada94898f4d392649555f2ed2
Parents: 1afb4a2 5df199d
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Feb 16 17:28:42 2016 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Feb 16 17:28:42 2016 -0800
----------------------------------------------------------------------
checkstyle-suppressions.xml | 27 +++++++++++++++++++++++++++
pom.xml | 7 ++++---
2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[3/6] incubator-apex-malhar git commit: Merge branch
'APEXMALHAR-1990' of
https://github.com/PramodSSImmaneni/incubator-apex-malhar
Posted by th...@apache.org.
Merge branch 'APEXMALHAR-1990' of https://github.com/PramodSSImmaneni/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1eb554f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1eb554f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1eb554f0
Branch: refs/heads/devel-3
Commit: 1eb554f03828f04f2b54765dc38ac96b8f31ec58
Parents: 5775a53 c39f565
Author: Thomas Weise <th...@datatorrent.com>
Authored: Fri Feb 12 12:37:24 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Feb 12 12:37:24 2016 -0800
----------------------------------------------------------------------
.../lib/io/IdempotentStorageManager.java | 25 ++++++++---------
.../datatorrent/lib/util/WindowDataManager.java | 11 +++++---
.../lib/io/IdempotentStorageManagerTest.java | 28 ++++++++++++++------
.../lib/util/WindowDataManagerTest.java | 18 ++++++++++---
4 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------