You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/29 18:36:55 UTC

[1/4] flink git commit: [FLINK-1795] [runtime] Add test for duplicate elimination in the solution set.

Repository: flink
Updated Branches:
  refs/heads/master 1c1562ab7 -> d03dd63b7


[FLINK-1795] [runtime] Add test for duplicate elimination in the solution set.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/923a2ae2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/923a2ae2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/923a2ae2

Branch: refs/heads/master
Commit: 923a2ae259bd72a2d48639ae0e64db0a04a4aa91
Parents: 1c1562a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 17:44:21 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:22 2015 +0200

----------------------------------------------------------------------
 .../iterative/task/IterationHeadPactTask.java   | 36 +++-----
 .../operators/hash/CompactingHashTable.java     | 10 +++
 .../iterative/SolutionSetDuplicatesITCase.java  | 87 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index abaf311..cf02bdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -85,20 +85,14 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 	private Collector<X> finalOutputCollector;
 
-	private List<RecordWriter<?>> finalOutputWriters;
-
 	private TypeSerializerFactory<Y> feedbackTypeSerializer;
 
 	private TypeSerializerFactory<X> solutionTypeSerializer;
 
 	private ResultPartitionWriter toSync;
 
-	private int initialSolutionSetInput; // undefined for bulk iterations
-
 	private int feedbackDataInput; // workset or bulk partial solution
 
-	private RuntimeAggregatorRegistry aggregatorRegistry;
-
 	// --------------------------------------------------------------------------------------------
 
 	@Override
@@ -115,15 +109,15 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 		// at this time, the outputs to the step function are created
 		// add the outputs for the final solution
-		this.finalOutputWriters = new ArrayList<RecordWriter<?>>();
+		List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
 		final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
 		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 		this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
-			userCodeClassLoader, this.finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
+			userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
 
 		// sanity check the setup
 		final int writersIntoStepFunction = this.eventualOutputs.size();
-		final int writersIntoFinalResult = this.finalOutputWriters.size();
+		final int writersIntoFinalResult = finalOutputWriters.size();
 		final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
 
 		if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
@@ -207,13 +201,12 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
 		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
 		
-		JoinHashMap<BT> map = new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
-		return map;
+		return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
 	}
 	
 	private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
 		solutionSet.open();
-		solutionSet.buildTable(solutionSetInput);
+		solutionSet.buildTableWithUniqueKey(solutionSetInput);
 	}
 	
 	private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
@@ -255,14 +248,13 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
 
 			feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex();
-			feedbackTypeSerializer = this.<Y>getInputSerializer(feedbackDataInput);
+			feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput);
 			excludeFromReset(feedbackDataInput);
 
+			int initialSolutionSetInput;
 			if (isWorksetIteration) {
 				initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex();
-				TypeSerializerFactory<X> solutionTypeSerializerFactory = config
-						.getSolutionSetSerializer(getUserCodeClassLoader());
-				solutionTypeSerializer = solutionTypeSerializerFactory;
+				solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader());
 
 				// setup the index for the solution set
 				@SuppressWarnings("unchecked")
@@ -283,10 +275,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
 					SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
 				}
-			} else {
+			}
+			else {
 				// bulk iteration case
-				initialSolutionSetInput = -1;
-
 				@SuppressWarnings("unchecked")
 				TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
 				solutionTypeSerializer = solSer;
@@ -299,7 +290,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			}
 
 			// instantiate all aggregators and register them at the iteration global registry
-			aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
+			RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
 					(getUserCodeClassLoader()));
 			IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
 
@@ -392,7 +383,6 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 			if (solutionSet != null) {
 				solutionSet.close();
-				solutionSet = null;
 			}
 		}
 	}
@@ -434,8 +424,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
 		}
 
-		for (int outputIndex = 0; outputIndex < this.eventualOutputs.size(); outputIndex++) {
-			this.eventualOutputs.get(outputIndex).sendEndOfSuperstep();
+		for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
+			eventualOutput.sendEndOfSuperstep();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 7107972..6533e19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -337,6 +337,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			insert(record);
 		}
 	}
+
+	public void buildTableWithUniqueKey(final MutableObjectIterator<T> input) throws IOException {
+		T record = this.buildSideSerializer.createInstance();
+		T tmp = this.buildSideSerializer.createInstance();
+
+		// go over the complete input and insert every element into the hash table
+		while (this.running && ((record = input.next(record)) != null)) {
+			insertOrReplaceRecord(record, tmp);
+		}
+	}
 	
 	public final void insert(T record) throws IOException {
 		if(this.closed.get()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
new file mode 100644
index 0000000..c987dfd
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
+
+	public SolutionSetDuplicatesITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testProgram() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Long, Long>> data = env
+				.generateSequence(0, 10)
+				.flatMap(new FlatMapFunction<Long, Tuple2<Long, Long>>() {
+					@Override
+					public void flatMap(Long value, Collector<Tuple2<Long, Long>> out) {
+						out.collect(new Tuple2<Long, Long>(value, value));
+						out.collect(new Tuple2<Long, Long>(value, value));
+						out.collect(new Tuple2<Long, Long>(value, value));
+					}
+				})
+				.rebalance();
+
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = data.iterateDelta(data, 10, 0);
+
+			List<Integer> result = iter
+					.closeWith(iter.getWorkset(), iter.getWorkset())
+					.map(new MapFunction<Tuple2<Long,Long>, Integer>() {
+						@Override
+						public Integer map(Tuple2<Long, Long> value) {
+							return value.f0.intValue();
+						}
+					})
+					.collect();
+
+			assertEquals(11, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


[4/4] flink git commit: [FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.

Posted by se...@apache.org.
[FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c321425
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c321425
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c321425

Branch: refs/heads/master
Commit: 8c32142528590a030693529c7c8d93f194968c0a
Parents: d6ea1f2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:26:33 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala     | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index b12b82d..5ab550f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -47,7 +47,7 @@ public class ProcessReaper extends UntypedActor {
 		if (message instanceof Terminated) {
 			try {
 				Terminated term = (Terminated) message;
-				String name = term.actor().path().name();
+				String name = term.actor().path().toSerializationFormat();
 				if (log != null) {
 					log.error("Actor " + name + " terminated, stopping process...");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2233dbf..9aa476d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -769,8 +769,14 @@ object JobManager {
       if (executionMode == JobManagerMode.LOCAL) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
 
-        TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress,
+        val taskManagerActor = TaskManager.startTaskManagerActor(
+          configuration, jobManagerSystem, listeningAddress,
           TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+
+        LOG.debug("Starting TaskManager process reaper")
+        jobManagerSystem.actorOf(
+          Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+          "TaskManager_Process_Reaper")
       }
 
       // start the job manager web frontend


[3/4] flink git commit: [tests] Add template log4j.properties files for tests

Posted by se...@apache.org.
[tests] Add template log4j.properties files for tests

Also clean up some unused imports.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d03dd63b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d03dd63b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d03dd63b

Branch: refs/heads/master
Commit: d03dd63b7034d3bdcef6df282b5277ba9d6ed1c4
Parents: 8c32142
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Mar 29 16:55:06 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200

----------------------------------------------------------------------
 .../src/test/resources/log4j-test.properties    | 15 ++++++-----
 .../src/test/resources/log4j-test.properties    | 10 +++++++-
 .../src/test/resources/log4j-test.properties    | 13 +++++++++-
 .../src/test/resources/log4j.properties         | 27 --------------------
 .../StreamCheckpointingITCase.java              |  9 ++++---
 .../src/test/resources/log4j-test.properties    |  2 +-
 6 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-clients/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/log4j-test.properties b/flink-clients/src/test/resources/log4j-test.properties
index 04ec355..85897b3 100644
--- a/flink-clients/src/test/resources/log4j-test.properties
+++ b/flink-clients/src/test/resources/log4j-test.properties
@@ -16,12 +16,15 @@
 # limitations under the License.
 ################################################################################
 
-# Set root logger level to OFF and its only appender to A1.
-log4j.rootLogger=OFF, A1
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
 
 # A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/log4j-test.properties b/flink-core/src/test/resources/log4j-test.properties
index 2fb9345..4c74d85 100644
--- a/flink-core/src/test/resources/log4j-test.properties
+++ b/flink-core/src/test/resources/log4j-test.properties
@@ -16,4 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-optimizer/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/log4j-test.properties b/flink-optimizer/src/test/resources/log4j-test.properties
index 2fb9345..85897b3 100644
--- a/flink-optimizer/src/test/resources/log4j-test.properties
+++ b/flink-optimizer/src/test/resources/log4j-test.properties
@@ -16,4 +16,15 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-optimizer/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/log4j.properties b/flink-optimizer/src/test/resources/log4j.properties
deleted file mode 100644
index fa3f937..0000000
--- a/flink-optimizer/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target  = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 797555a..10773f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -28,17 +28,18 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.BeforeClass;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.

http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 0845c81..85897b3 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender


[2/4] flink git commit: [tests] Remove incomplete and unused LineRankITCase.

Posted by se...@apache.org.
[tests] Remove incomplete and unused LineRankITCase.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6ea1f22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6ea1f22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6ea1f22

Branch: refs/heads/master
Commit: d6ea1f227a309a04bfef2d051d8240736fb186f0
Parents: 923a2ae
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:01:20 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200

----------------------------------------------------------------------
 .../flink/test/iterative/LineRankITCase.java    | 92 --------------------
 1 file changed, 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6ea1f22/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
deleted file mode 100644
index fa13656..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.iterative;
-//
-//import java.util.Collection;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.examples.scala.graph.LineRank;
-//import org.apache.flink.test.util.RecordAPITestBase;
-//import org.junit.runner.RunWith;
-//import org.junit.runners.Parameterized;
-//import org.junit.runners.Parameterized.Parameters;
-//
-//@RunWith(Parameterized.class)
-//public class LineRankITCase extends RecordAPITestBase {
-//
-//	private static final String SOURCE_INCIDENCE = "1,1,1\n" +
-//	                                               "2,1,1\n" +
-//	                                               "3,1,1\n" +
-//	                                               "4,2,1\n" +
-//	                                               "5,3,1\n" +
-//	                                               "6,3,1\n" +
-//	                                               "7,4,1\n" +
-//	                                               "8,4,1\n" +
-//	                                               "9,5,1\n";
-//
-//	private static final String TARGET_INCIDENCE = "1,2,1\n" +
-//	                                               "2,3,1\n" +
-//	                                               "3,4,1\n" +
-//	                                               "4,3,1\n" +
-//	                                               "5,2,1\n" +
-//	                                               "6,5,1\n" +
-//	                                               "7,1,1\n" +
-//	                                               "8,3,1\n" +
-//	                                               "9,4,1\n";
-//
-//	protected String sourcesPath;
-//	protected String targetsPath;
-//	protected String resultPath;
-//
-//
-//	public LineRankITCase(Configuration config) {
-//		super(config);
-//		setTaskManagerNumSlots(parallelism);
-//	}
-//
-//	@Override
-//	protected void preSubmit() throws Exception {
-//		sourcesPath = createTempFile("sourceIncidence.txt", SOURCE_INCIDENCE);
-//		targetsPath = createTempFile("targetIncidence.txt", TARGET_INCIDENCE);
-//		resultPath = getTempFilePath("results");
-//	}
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		LineRank lr = new LineRank();
-//
-//		Plan plan = lr.getScalaPlan(
-//			config.getInteger("NumSubtasks", 1),
-//			sourcesPath,
-//			targetsPath,
-//			9,
-//			resultPath);
-//		return plan;
-//	}
-//
-//	@Parameters
-//	public static Collection<Object[]> getConfigurations() {
-//		Configuration config1 = new Configuration();
-//		config1.setInteger("NumSubtasks", parallelism);
-//		config1.setInteger("NumIterations", 5);
-//		return toParameterList(config1);
-//	}
-//}