You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/29 18:36:55 UTC
[1/4] flink git commit: [FLINK-1795] [runtime] Add test for duplicate
elimination in the solution set.
Repository: flink
Updated Branches:
refs/heads/master 1c1562ab7 -> d03dd63b7
[FLINK-1795] [runtime] Add test for duplicate elimination in the solution set.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/923a2ae2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/923a2ae2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/923a2ae2
Branch: refs/heads/master
Commit: 923a2ae259bd72a2d48639ae0e64db0a04a4aa91
Parents: 1c1562a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 17:44:21 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:22 2015 +0200
----------------------------------------------------------------------
.../iterative/task/IterationHeadPactTask.java | 36 +++-----
.../operators/hash/CompactingHashTable.java | 10 +++
.../iterative/SolutionSetDuplicatesITCase.java | 87 ++++++++++++++++++++
3 files changed, 110 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index abaf311..cf02bdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -85,20 +85,14 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
private Collector<X> finalOutputCollector;
- private List<RecordWriter<?>> finalOutputWriters;
-
private TypeSerializerFactory<Y> feedbackTypeSerializer;
private TypeSerializerFactory<X> solutionTypeSerializer;
private ResultPartitionWriter toSync;
- private int initialSolutionSetInput; // undefined for bulk iterations
-
private int feedbackDataInput; // workset or bulk partial solution
- private RuntimeAggregatorRegistry aggregatorRegistry;
-
// --------------------------------------------------------------------------------------------
@Override
@@ -115,15 +109,15 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
// at this time, the outputs to the step function are created
// add the outputs for the final solution
- this.finalOutputWriters = new ArrayList<RecordWriter<?>>();
+ List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
- userCodeClassLoader, this.finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
+ userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
- final int writersIntoFinalResult = this.finalOutputWriters.size();
+ final int writersIntoFinalResult = finalOutputWriters.size();
final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
@@ -207,13 +201,12 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
- JoinHashMap<BT> map = new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
- return map;
+ return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
}
private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
solutionSet.open();
- solutionSet.buildTable(solutionSetInput);
+ solutionSet.buildTableWithUniqueKey(solutionSetInput);
}
private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
@@ -255,14 +248,13 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex();
- feedbackTypeSerializer = this.<Y>getInputSerializer(feedbackDataInput);
+ feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput);
excludeFromReset(feedbackDataInput);
+ int initialSolutionSetInput;
if (isWorksetIteration) {
initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex();
- TypeSerializerFactory<X> solutionTypeSerializerFactory = config
- .getSolutionSetSerializer(getUserCodeClassLoader());
- solutionTypeSerializer = solutionTypeSerializerFactory;
+ solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader());
// setup the index for the solution set
@SuppressWarnings("unchecked")
@@ -283,10 +275,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
}
- } else {
+ }
+ else {
// bulk iteration case
- initialSolutionSetInput = -1;
-
@SuppressWarnings("unchecked")
TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
solutionTypeSerializer = solSer;
@@ -299,7 +290,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
}
// instantiate all aggregators and register them at the iteration global registry
- aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
+ RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
(getUserCodeClassLoader()));
IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
@@ -392,7 +383,6 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
if (solutionSet != null) {
solutionSet.close();
- solutionSet = null;
}
}
}
@@ -434,8 +424,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
}
- for (int outputIndex = 0; outputIndex < this.eventualOutputs.size(); outputIndex++) {
- this.eventualOutputs.get(outputIndex).sendEndOfSuperstep();
+ for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
+ eventualOutput.sendEndOfSuperstep();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 7107972..6533e19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -337,6 +337,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
insert(record);
}
}
+
+ public void buildTableWithUniqueKey(final MutableObjectIterator<T> input) throws IOException {
+ T record = this.buildSideSerializer.createInstance();
+ T tmp = this.buildSideSerializer.createInstance();
+
+ // go over the complete input and insert every element into the hash table
+ while (this.running && ((record = input.next(record)) != null)) {
+ insertOrReplaceRecord(record, tmp);
+ }
+ }
public final void insert(T record) throws IOException {
if(this.closed.get()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
new file mode 100644
index 0000000..c987dfd
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
+
+ public SolutionSetDuplicatesITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testProgram() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env
+ .generateSequence(0, 10)
+ .flatMap(new FlatMapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public void flatMap(Long value, Collector<Tuple2<Long, Long>> out) {
+ out.collect(new Tuple2<Long, Long>(value, value));
+ out.collect(new Tuple2<Long, Long>(value, value));
+ out.collect(new Tuple2<Long, Long>(value, value));
+ }
+ })
+ .rebalance();
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = data.iterateDelta(data, 10, 0);
+
+ List<Integer> result = iter
+ .closeWith(iter.getWorkset(), iter.getWorkset())
+ .map(new MapFunction<Tuple2<Long,Long>, Integer>() {
+ @Override
+ public Integer map(Tuple2<Long, Long> value) {
+ return value.f0.intValue();
+ }
+ })
+ .collect();
+
+ assertEquals(11, result.size());
+
+ Collections.sort(result);
+ assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), result);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
[4/4] flink git commit: [FLINK-1796] [jobmanager] In local mode,
the embedded TaskManager is watched by a process reaper as well.
Posted by se...@apache.org.
[FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c321425
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c321425
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c321425
Branch: refs/heads/master
Commit: 8c32142528590a030693529c7c8d93f194968c0a
Parents: d6ea1f2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:26:33 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 8 +++++++-
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index b12b82d..5ab550f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -47,7 +47,7 @@ public class ProcessReaper extends UntypedActor {
if (message instanceof Terminated) {
try {
Terminated term = (Terminated) message;
- String name = term.actor().path().name();
+ String name = term.actor().path().toSerializationFormat();
if (log != null) {
log.error("Actor " + name + " terminated, stopping process...");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2233dbf..9aa476d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -769,8 +769,14 @@ object JobManager {
if (executionMode == JobManagerMode.LOCAL) {
LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
- TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress,
+ val taskManagerActor = TaskManager.startTaskManagerActor(
+ configuration, jobManagerSystem, listeningAddress,
TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+
+ LOG.debug("Starting TaskManager process reaper")
+ jobManagerSystem.actorOf(
+ Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+ "TaskManager_Process_Reaper")
}
// start the job manager web frontend
[3/4] flink git commit: [tests] Add template log4j.properties files
for tests
Posted by se...@apache.org.
[tests] Add template log4j.properties files for tests
Also clean up some unused imports.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d03dd63b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d03dd63b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d03dd63b
Branch: refs/heads/master
Commit: d03dd63b7034d3bdcef6df282b5277ba9d6ed1c4
Parents: 8c32142
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Mar 29 16:55:06 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200
----------------------------------------------------------------------
.../src/test/resources/log4j-test.properties | 15 ++++++-----
.../src/test/resources/log4j-test.properties | 10 +++++++-
.../src/test/resources/log4j-test.properties | 13 +++++++++-
.../src/test/resources/log4j.properties | 27 --------------------
.../StreamCheckpointingITCase.java | 9 ++++---
.../src/test/resources/log4j-test.properties | 2 +-
6 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-clients/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/log4j-test.properties b/flink-clients/src/test/resources/log4j-test.properties
index 04ec355..85897b3 100644
--- a/flink-clients/src/test/resources/log4j-test.properties
+++ b/flink-clients/src/test/resources/log4j-test.properties
@@ -16,12 +16,15 @@
# limitations under the License.
################################################################################
-# Set root logger level to OFF and its only appender to A1.
-log4j.rootLogger=OFF, A1
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/log4j-test.properties b/flink-core/src/test/resources/log4j-test.properties
index 2fb9345..4c74d85 100644
--- a/flink-core/src/test/resources/log4j-test.properties
+++ b/flink-core/src/test/resources/log4j-test.properties
@@ -16,4 +16,12 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-optimizer/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/log4j-test.properties b/flink-optimizer/src/test/resources/log4j-test.properties
index 2fb9345..85897b3 100644
--- a/flink-optimizer/src/test/resources/log4j-test.properties
+++ b/flink-optimizer/src/test/resources/log4j-test.properties
@@ -16,4 +16,15 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-optimizer/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/log4j.properties b/flink-optimizer/src/test/resources/log4j.properties
deleted file mode 100644
index fa3f937..0000000
--- a/flink-optimizer/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 797555a..10773f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -28,17 +28,18 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.BeforeClass;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
http://git-wip-us.apache.org/repos/asf/flink/blob/d03dd63b/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 0845c81..85897b3 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
[2/4] flink git commit: [tests] Remove incomplete and unused
LineRankITCase.
Posted by se...@apache.org.
[tests] Remove incomplete and unused LineRankITCase.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6ea1f22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6ea1f22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6ea1f22
Branch: refs/heads/master
Commit: d6ea1f227a309a04bfef2d051d8240736fb186f0
Parents: 923a2ae
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:01:20 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200
----------------------------------------------------------------------
.../flink/test/iterative/LineRankITCase.java | 92 --------------------
1 file changed, 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d6ea1f22/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
deleted file mode 100644
index fa13656..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.iterative;
-//
-//import java.util.Collection;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.examples.scala.graph.LineRank;
-//import org.apache.flink.test.util.RecordAPITestBase;
-//import org.junit.runner.RunWith;
-//import org.junit.runners.Parameterized;
-//import org.junit.runners.Parameterized.Parameters;
-//
-//@RunWith(Parameterized.class)
-//public class LineRankITCase extends RecordAPITestBase {
-//
-// private static final String SOURCE_INCIDENCE = "1,1,1\n" +
-// "2,1,1\n" +
-// "3,1,1\n" +
-// "4,2,1\n" +
-// "5,3,1\n" +
-// "6,3,1\n" +
-// "7,4,1\n" +
-// "8,4,1\n" +
-// "9,5,1\n";
-//
-// private static final String TARGET_INCIDENCE = "1,2,1\n" +
-// "2,3,1\n" +
-// "3,4,1\n" +
-// "4,3,1\n" +
-// "5,2,1\n" +
-// "6,5,1\n" +
-// "7,1,1\n" +
-// "8,3,1\n" +
-// "9,4,1\n";
-//
-// protected String sourcesPath;
-// protected String targetsPath;
-// protected String resultPath;
-//
-//
-// public LineRankITCase(Configuration config) {
-// super(config);
-// setTaskManagerNumSlots(parallelism);
-// }
-//
-// @Override
-// protected void preSubmit() throws Exception {
-// sourcesPath = createTempFile("sourceIncidence.txt", SOURCE_INCIDENCE);
-// targetsPath = createTempFile("targetIncidence.txt", TARGET_INCIDENCE);
-// resultPath = getTempFilePath("results");
-// }
-//
-// @Override
-// protected Plan getTestJob() {
-// LineRank lr = new LineRank();
-//
-// Plan plan = lr.getScalaPlan(
-// config.getInteger("NumSubtasks", 1),
-// sourcesPath,
-// targetsPath,
-// 9,
-// resultPath);
-// return plan;
-// }
-//
-// @Parameters
-// public static Collection<Object[]> getConfigurations() {
-// Configuration config1 = new Configuration();
-// config1.setInteger("NumSubtasks", parallelism);
-// config1.setInteger("NumIterations", 5);
-// return toParameterList(config1);
-// }
-//}