You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 14:27:30 UTC

[1/4] flink git commit: [hotfix] [tests] Clean up (mostly redundant) PowerMock runners, preparations, and exclusions

Repository: flink
Updated Branches:
  refs/heads/master c21ecaed2 -> 6a6eeb9d5


[hotfix] [tests] Clean up (mostly redundant) PowerMock runners, preparations, and exclusions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a6eeb9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a6eeb9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a6eeb9d

Branch: refs/heads/master
Commit: 6a6eeb9d53c96772de8c0b5f9684128a9d8e6285
Parents: d9ddcb4
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 7 18:54:35 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../flink/storm/wrappers/BoltWrapperTest.java   |  2 +-
 .../flink/storm/wrappers/SpoutWrapperTest.java  |  2 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  |  2 +-
 .../runtime/operators/DataSinkTaskTest.java     | 10 ------
 .../runtime/operators/DataSourceTaskTest.java   | 33 +++++++-------------
 .../operators/chaining/ChainTaskTest.java       | 11 +------
 .../cassandra/CassandraConnectorITCase.java     |  3 --
 .../operators/GenericWriteAheadSinkTest.java    | 11 ++-----
 .../operators/WriteAheadSinkTestBase.java       | 12 ++-----
 9 files changed, 22 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index a7d60a6..61a88ad 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -69,7 +69,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({StreamElementSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
 public class BoltWrapperTest extends AbstractTest {
 
 	@Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index e50ff5a..dc84b33 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WrapperSetupHelper.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
 public class SpoutWrapperTest extends AbstractTest {
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 000fe84..b4e153a 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -58,7 +58,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WrapperSetupHelper.class)
-@PowerMockIgnore("javax.*")
+@PowerMockIgnore({"javax.*", "org.apache.log4j.*"})
 public class WrapperSetupHelperTest extends AbstractTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index a41c25b..226dc91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -22,24 +22,17 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,9 +49,6 @@ import java.util.Set;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class DataSinkTaskTest extends TaskTestBase {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 0388c2b..1ebdcb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -19,21 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-
 import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.junit.Assert;
-
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -41,16 +27,21 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
 public class DataSourceTaskTest extends TaskTestBase {
 
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 02c420c..0c9fd79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.FlatMapDriver;
@@ -39,20 +37,13 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 
 public class ChainTaskTest extends TaskTestBase {
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index d58b0af..a29e881 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -77,9 +77,6 @@ import java.util.UUID;
 import static org.junit.Assert.*;
 
 @SuppressWarnings("serial")
-@PowerMockIgnore("javax.management.*")
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
 public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index e5e26e9..e186be0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -15,29 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.List;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
+
 	@Override
 	protected ListSink createSink() throws Exception {
 		return new ListSink();

http://git-wip-us.apache.org/repos/asf/flink/blob/6a6eeb9d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index 3d1e6e8..ab84bc1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -15,23 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
+
 public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {
 
 	protected abstract S createSink() throws Exception;


[3/4] flink git commit: [hotfix] [RocksDB backend] Minor cleanups to constructors and comments in RocksDBStateBackend

Posted by se...@apache.org.
[hotfix] [RocksDB backend] Minor cleanups to constructors and comments in RocksDBStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9ddcb4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9ddcb4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9ddcb4c

Branch: refs/heads/master
Commit: d9ddcb4c297082ef9544c1b5d94c75267662b4c6
Parents: d450bee
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 7 18:39:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 34 ++++++++------------
 .../state/RocksDBAsyncSnapshotTest.java         |  6 ++--
 .../state/RocksDBStateBackendConfigTest.java    |  3 +-
 .../state/RocksDBStateBackendTest.java          |  2 +-
 .../flink/cep/operator/CEPOperatorTest.java     | 10 ++----
 .../EventTimeWindowCheckpointingITCase.java     |  3 +-
 .../test/state/ManualWindowSpeedITCase.java     | 11 +++----
 7 files changed, 26 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 37c6312..9ba0dc1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,15 +29,15 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -134,29 +134,23 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
-		// creating the FsStateBackend automatically sanity checks the URI
-		FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
-
-		this.checkpointStreamBackend = fsStateBackend;
-	}
-
-
-	public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-		this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
+		this(new FsStateBackend(checkpointDataUri));
 	}
 
-	public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend checkpointStreamBackend) throws IOException {
+	/**
+	 * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its
+	 * checkpoint data streams. Typically, one would supply a filesystem or database state backend
+	 * here where the snapshots from RocksDB would be stored.
+	 * 
+	 * <p>The snapshots of the RocksDB state will be stored using the given backend's
+	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. 
+	 * 
+	 * @param checkpointStreamBackend The backend to store the
+	 */
+	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
 		this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
 	}
 
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.defaultWriteObject();
-	}
-
-	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		ois.defaultReadObject();
-		isInitialized = false;
-	}
 	// ------------------------------------------------------------------------
 	//  State backend methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 1c5a91c..f3da93e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -119,9 +119,8 @@ public class RocksDBAsyncSnapshotTest {
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
 		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
 
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+		RocksDBStateBackend backend = new RocksDBStateBackend(new MemoryStateBackend());
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
 
 		streamConfig.setStateBackend(backend);
@@ -220,11 +219,10 @@ public class RocksDBAsyncSnapshotTest {
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
 		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
 
 		BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
 
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend);
+		RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
 
 		streamConfig.setStateBackend(backend);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 07fc27c..bf9b315 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -343,8 +343,7 @@ public class RocksDBStateBackendConfigTest {
 	@Test
 	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
 		AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
-		String checkpointPath = tempFolder.newFolder().toURI().toString();
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, nonPartBackend);
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(nonPartBackend);
 
 		Environment env = getMockEnvironment();
 		rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9222f0b..9d25434 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -45,7 +45,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	protected RocksDBStateBackend getStateBackend() throws IOException {
 		String dbPath = tempFolder.newFolder().getAbsolutePath();
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
-		RocksDBStateBackend backend = new RocksDBStateBackend(checkpointPath, new FsStateBackend(checkpointPath));
+		RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath));
 		backend.setDbStoragePath(dbPath);
 		return backend;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 0f49b13..6cffd9c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -301,9 +301,7 @@ public class CEPOperatorTest extends TestLogger {
 	public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
 
 		String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
-		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-		RocksDBStateBackend rocksDBStateBackend =
-				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -350,8 +348,7 @@ public class CEPOperatorTest extends TestLogger {
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
 
-		rocksDBStateBackend =
-				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 		harness.setStateBackend(rocksDBStateBackend);
 
@@ -381,8 +378,7 @@ public class CEPOperatorTest extends TestLogger {
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
 
-		rocksDBStateBackend =
-				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 		harness.setStateBackend(rocksDBStateBackend);
 		harness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4dbf5cb..4f28d8c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -120,8 +120,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 			}
 			case ROCKSDB_FULLY_ASYNC: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+				RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
 				rdb.setDbStoragePath(rocksDb);
 				this.stateBackend = rdb;
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 428c47c..456861a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -127,9 +127,8 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 		env.setParallelism(1);
-
-		String checkpoints = tempFolder.newFolder().toURI().toString();
-		env.setStateBackend(new RocksDBStateBackend(checkpoints, new MemoryStateBackend()));
+		
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 
 		env.addSource(new InfiniteTupleSource(10_000))
 				.keyBy(0)
@@ -163,8 +162,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 		env.setParallelism(1);
 
-		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-		env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 
 		env.addSource(new InfiniteTupleSource(10_000))
 				.keyBy(0)
@@ -199,8 +197,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		env.setParallelism(1);
 
-		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-		env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 
 		env.addSource(new InfiniteTupleSource(10_000))
 				.keyBy(0)


[4/4] flink git commit: [FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService has been shut down

Posted by se...@apache.org.
[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService has been shut down

This PR suppresses occurring RejectedExecutionExceptions if an ExecutorService has been shut
down. This only works for ExecutorServices at the moment. All other exceptions are logged.

This closes #2757


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3908241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3908241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3908241

Branch: refs/heads/master
Commit: d3908241323f5de3d454c7daf560faed49b3c779
Parents: c21ecae
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 4 11:16:47 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../runtime/concurrent/impl/FlinkFuture.java    | 27 ++++++++++++++++++--
 .../flink/runtime/jobmanager/JobManager.scala   |  5 ++--
 2 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3908241/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index b678c5e..9783d4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -23,6 +23,7 @@ import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.dispatch.Recover;
+import akka.japi.Procedure;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -30,6 +31,8 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -43,6 +46,7 @@ import scala.util.Try;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -53,6 +57,8 @@ import java.util.concurrent.TimeoutException;
  */
 public class FlinkFuture<T> implements Future<T> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class);
+
 	protected scala.concurrent.Future<T> scalaFuture;
 
 	FlinkFuture() {
@@ -335,8 +341,25 @@ public class FlinkFuture<T> implements Future<T> {
 	// Helper functions and types
 	//-----------------------------------------------------------------------------------
 
-	private static ExecutionContext createExecutionContext(Executor executor) {
-		return ExecutionContexts$.MODULE$.fromExecutor(executor);
+	private static ExecutionContext createExecutionContext(final Executor executor) {
+		return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() {
+			@Override
+			public void apply(Throwable throwable) throws Exception {
+				if (executor instanceof ExecutorService) {
+					ExecutorService executorService = (ExecutorService) executor;
+					// only log the exception if the executor service is still running
+					if (!executorService.isShutdown()) {
+						logThrowable(throwable);
+					}
+				} else {
+					logThrowable(throwable);
+				}
+			}
+
+			private void logThrowable(Throwable throwable) {
+				LOG.warn("Uncaught exception in execution context.", throwable);
+			}
+		});
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d3908241/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3f0689f..68e71ef 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, TimeoutException}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
-import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
@@ -87,7 +87,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 
 /**


[2/4] flink git commit: [FLINK-5014] [RocksDB backend] Add toString for RocksDBStateBackend

Posted by se...@apache.org.
[FLINK-5014] [RocksDB backend] Add toString for RocksDBStateBackend

This closes #2760


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d450beec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d450beec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d450beec

Branch: refs/heads/master
Commit: d450beece2f5f76d7f519101316f850e6fa4924a
Parents: d390824
Author: Andrey Melentyev <an...@gmail.com>
Authored: Sat Nov 5 10:18:38 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBStateBackend.java     | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d450beec/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 82e7899..37c6312 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -40,6 +40,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
@@ -447,4 +448,14 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		}
 		return columnOptions;
 	}
+
+	@Override
+	public String toString() {
+		return "RocksDB State Backend {" +
+			"isInitialized=" + isInitialized +
+			", configuredDbBasePaths=" + Arrays.toString(configuredDbBasePaths) +
+			", initializedDbBasePaths=" + Arrays.toString(initializedDbBasePaths) +
+			", checkpointStreamBackend=" + checkpointStreamBackend +
+			'}';
+	}
 }