You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/26 16:23:18 UTC
[1/4] flink git commit: [FLINK-2084] [core] Add an option to start
Flink in streaming mode
Repository: flink
Updated Branches:
refs/heads/master 924830ffa -> 043772244
[FLINK-2084] [core] Add an option to start Flink in streaming mode
- Streaming mode sets the memory manager to lazy memory allocation to ensure
heap is not blocked by batch memory manager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efec2297
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efec2297
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efec2297
Branch: refs/heads/master
Commit: efec2297b46b83e0dc6111a91a3f80f9d5375e0d
Parents: ea60678
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 17:12:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:22 2015 +0200
----------------------------------------------------------------------
flink-dist/src/main/flink-bin/bin/jobmanager.sh | 5 +-
.../flink-bin/bin/start-cluster-streaming.sh | 48 +++++++
.../src/main/flink-bin/bin/start-cluster.sh | 2 +-
.../main/flink-bin/bin/start-local-streaming.sh | 27 ++++
.../src/main/flink-bin/bin/start-local.bat | 2 +-
.../src/main/flink-bin/bin/start-local.sh | 2 +-
.../src/main/flink-bin/bin/taskmanager.sh | 5 +-
.../org/apache/flink/runtime/StreamingMode.java | 34 +++++
.../jobmanager/JobManagerCliOptions.java | 77 ++++++++++++
.../apache/flink/runtime/taskmanager/Task.java | 2 +-
.../taskmanager/TaskManagerCliOptions.java | 57 +++++++++
.../flink/runtime/util/SignalHandler.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 125 ++++++++++---------
.../jobmanager/JobManagerCLIConfiguration.scala | 28 -----
.../runtime/minicluster/FlinkMiniCluster.scala | 9 +-
.../minicluster/LocalFlinkMiniCluster.scala | 31 +++--
.../flink/runtime/taskmanager/TaskManager.scala | 69 ++++++----
.../TaskManagerCLIConfiguration.scala | 26 ----
18 files changed, 402 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 273cdd3..0f1e4fa 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -20,6 +20,7 @@
STARTSTOP=$1
EXECUTIONMODE=$2
+STREAMINGMODE=$3
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
@@ -80,7 +81,7 @@ case $STARTSTOP in
rotateLogFile $out
echo "Starting Job Manager"
- $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
+ $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
echo $! > $pid
;;
@@ -99,7 +100,7 @@ case $STARTSTOP in
;;
(*)
- echo "Please specify 'start (cluster|local)' or stop"
+ echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'"
;;
esac
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
new file mode 100755
index 0000000..86a87cd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+HOSTLIST=$FLINK_SLAVES
+
+if [ "$HOSTLIST" = "" ]; then
+ HOSTLIST="${FLINK_CONF_DIR}/slaves"
+fi
+
+if [ ! -f "$HOSTLIST" ]; then
+ echo $HOSTLIST is not a valid slave list
+ exit 1
+fi
+
+# cluster mode, bring up job manager locally and a task manager on every slave host
+"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming
+
+GOON=true
+while $GOON
+do
+ read line || GOON=false
+ if [ -n "$line" ]; then
+ HOST=$( extractHostName $line)
+ ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start streaming &"
+ fi
+done < "$HOSTLIST"
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index db65032..666edeb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -43,6 +43,6 @@ do
read line || GOON=false
if [ -n "$line" ]; then
HOST=$( extractHostName $line)
- ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start &"
+ ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start batch &"
fi
done < "$HOSTLIST"
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
new file mode 100755
index 0000000..2cb4d4a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# local mode, only bring up job manager. The job manager will start an internal task manager
+"$FLINK_BIN_DIR"/jobmanager.sh start local streaming
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 386a631..202c7d9 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -57,6 +57,6 @@ if not defined FOUND (
echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
-java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir "%FLINK_CONF_DIR%" > "%out%" 2>&1
+java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local --streamingMode batch > "%out%" 2>&1
endlocal
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index f382763..7ea3ff4 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local
+"$FLINK_BIN_DIR"/jobmanager.sh start local batch
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 557ea2b..a99d39d 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -19,6 +19,7 @@
STARTSTOP=$1
+STREAMINGMODE=$2
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
@@ -68,7 +69,7 @@ case $STARTSTOP in
rotateLogFile $out
echo Starting task manager on host $HOSTNAME
- $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
+ $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
echo $! > $pid
;;
@@ -87,7 +88,7 @@ case $STARTSTOP in
;;
(*)
- echo Please specify start or stop
+ echo "Please specify 'start [batch|streaming]' or 'stop'"
;;
esac
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
new file mode 100644
index 0000000..bdcbcf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+/**
+ * The streaming mode defines whether the system starts in streaming mode,
+ * or in pure batch mode. Note that streaming mode can execute batch programs
+ * as well.
+ */
+public enum StreamingMode {
+
+ /** This mode indicates the system can run streaming tasks, of which batch
+ * tasks are a special case. */
+ STREAMING,
+
+ /** This mode indicates that the system can run only batch tasks */
+ BATCH_ONLY;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
new file mode 100644
index 0000000..988e3a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.StreamingMode;
+
+/**
+ * The command line parameters passed to the TaskManager.
+ */
+public class JobManagerCliOptions {
+
+ private String configDir;
+
+ private JobManagerMode jobManagerMode;
+
+ private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
+
+ // ------------------------------------------------------------------------
+
+ public String getConfigDir() {
+ return configDir;
+ }
+
+ public void setConfigDir(String configDir) {
+ this.configDir = configDir;
+ }
+
+ public JobManagerMode getJobManagerMode() {
+ return jobManagerMode;
+ }
+
+ public void setJobManagerMode(String modeName) {
+ if (modeName.equalsIgnoreCase("cluster")) {
+ this.jobManagerMode = JobManagerMode.CLUSTER;
+ }
+ else if (modeName.equalsIgnoreCase("local")) {
+ this.jobManagerMode = JobManagerMode.LOCAL;
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Unknown execution mode. Execution mode must be one of 'cluster' or 'local'.");
+ }
+ }
+
+ public StreamingMode getStreamingMode() {
+ return streamingMode;
+ }
+
+ public void setStreamingMode(String modeName) {
+ if (modeName.equalsIgnoreCase("streaming")) {
+ this.streamingMode = StreamingMode.STREAMING;
+ }
+ else if (modeName.equalsIgnoreCase("batch")) {
+ this.streamingMode = StreamingMode.BATCH_ONLY;
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 51ce91f..40198dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -74,7 +74,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
- * runs it, providing all service necessary for example to consume input data,
+ * runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
new file mode 100644
index 0000000..a648caf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.StreamingMode;
+
+/**
+ * The command line parameters passed to the TaskManager.
+ */
+public class TaskManagerCliOptions {
+
+ private String configDir;
+
+ private StreamingMode mode = StreamingMode.BATCH_ONLY;
+
+ // ------------------------------------------------------------------------
+
+ public String getConfigDir() {
+ return configDir;
+ }
+
+ public void setConfigDir(String configDir) {
+ this.configDir = configDir;
+ }
+
+ public StreamingMode getMode() {
+ return mode;
+ }
+
+ public void setMode(String modeName) {
+ if (modeName.equalsIgnoreCase("streaming")) {
+ this.mode = StreamingMode.STREAMING;
+ }
+ else if (modeName.equalsIgnoreCase("batch")) {
+ this.mode = StreamingMode.BATCH_ONLY;
+ }
+ else {
+ throw new IllegalArgumentException("Mode must be one of 'BATCH' or 'STREAMING'.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
index 546e142..bcd3dc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import sun.misc.Signal;
/**
- * This signal handler / signal logger is based on Apache Hadoops org.apache.hadoop.util.SignalLogger.
+ * This signal handler / signal logger is based on Apache Hadoop's org.apache.hadoop.util.SignalLogger.
*/
public class SignalHandler {
private static boolean registered = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0c71938..ba819ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
@@ -99,7 +99,8 @@ class JobManager(protected val flinkConfiguration: Configuration,
protected val accumulatorManager: AccumulatorManager,
protected val defaultExecutionRetries: Int,
protected val delayBetweenRetries: Long,
- protected val timeout: FiniteDuration)
+ protected val timeout: FiniteDuration,
+ protected val mode: StreamingMode)
extends Actor with ActorLogMessages with ActorSynchronousLogging {
/** List of current jobs running jobs */
@@ -759,10 +760,11 @@ object JobManager {
val STARTUP_FAILURE_RETURN_CODE = 1
val RUNTIME_FAILURE_RETURN_CODE = 2
+ /** Name of the JobManager actor */
val JOB_MANAGER_NAME = "jobmanager"
- val EVENT_COLLECTOR_NAME = "eventcollector"
+
+ /** Name of the archive actor */
val ARCHIVE_NAME = "archive"
- val PROFILER_NAME = "profiler"
/**
@@ -778,6 +780,7 @@ object JobManager {
// parsing the command line arguments
val (configuration: Configuration,
executionMode: JobManagerMode,
+ streamingMode: StreamingMode,
listeningHost: String, listeningPort: Int) =
try {
parseArgs(args)
@@ -814,13 +817,15 @@ object JobManager {
LOG.info("Security is enabled. Starting secure JobManager.")
SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
override def run(): Unit = {
- runJobManager(configuration, executionMode, listeningHost, listeningPort)
+ runJobManager(configuration, executionMode, streamingMode,
+ listeningHost, listeningPort)
}
})
}
else {
LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
- runJobManager(configuration, executionMode, listeningHost, listeningPort)
+ runJobManager(configuration, executionMode, streamingMode,
+ listeningHost, listeningPort)
}
}
catch {
@@ -842,11 +847,13 @@ object JobManager {
* @param configuration The configuration object for the JobManager.
* @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
* an additional TaskManager in the same process.
+ * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
* @param listeningAddress The hostname where the JobManager should listen for messages.
* @param listeningPort The port where the JobManager should listen for messages.
*/
def runJobManager(configuration: Configuration,
executionMode: JobManagerMode,
+ streamingMode: StreamingMode,
listeningAddress: String,
listeningPort: Int) : Unit = {
@@ -880,7 +887,8 @@ object JobManager {
try {
// bring up the job manager actor
LOG.info("Starting JobManager actor")
- val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem)
+ val (jobManager, archiver) = startJobManagerActors(configuration,
+ jobManagerSystem, streamingMode)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -898,7 +906,8 @@ object JobManager {
listeningAddress,
Some(TaskManager.TASK_MANAGER_NAME),
Some(jobManager.path.toString),
- true, classOf[TaskManager])
+ true, streamingMode,
+ classOf[TaskManager])
LOG.debug("Starting TaskManager process reaper")
jobManagerSystem.actorOf(
@@ -936,61 +945,59 @@ object JobManager {
* @param args command line arguments
* @return Quadruple of configuration, execution mode and an optional listening address
*/
- def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = {
- val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
+ def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+ val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
head("Flink JobManager")
- opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text {
- "The configuration directory." }
-
- opt[String]("executionMode") action { (arg, c) =>
- val argLower = arg.toLowerCase()
- var result: JobManagerCLIConfiguration = null
-
- for (mode <- JobManagerMode.values() if result == null) {
- val modeName = mode.name().toLowerCase()
-
- if (modeName.equals(argLower)) {
- result = c.copy(executionMode = mode)
- }
- }
+ opt[String]("configDir") action { (arg, conf) =>
+ conf.setConfigDir(arg)
+ conf
+ } text {
+ "The configuration directory."
+ }
- if (result == null) {
- throw new Exception("Unknown execution mode: " + arg)
- } else {
- result
- }
+ opt[String]("executionMode") action { (arg, conf) =>
+ conf.setJobManagerMode(arg)
+ conf
} text {
"The execution mode of the JobManager (CLUSTER / LOCAL)"
}
- }
- parser.parse(args, JobManagerCLIConfiguration()) map {
- config =>
+ opt[String]("streamingMode").optional().action { (arg, conf) =>
+ conf.setStreamingMode(arg)
+ conf
+ } text {
+ "The streaming mode of the JobManager (STREAMING / BATCH)"
+ }
+ }
- if (config.configDir == null) {
- throw new Exception("Missing parameter '--configDir'")
- }
- if (config.executionMode == null) {
- throw new Exception("Missing parameter '--executionMode'")
- }
+ val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
+ throw new Exception(
+ s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
+ }
+
+ val configDir = config.getConfigDir()
+
+ if (configDir == null) {
+ throw new Exception("Missing parameter '--configDir'")
+ }
+ if (config.getJobManagerMode() == null) {
+ throw new Exception("Missing parameter '--executionMode'")
+ }
- LOG.info("Loading configuration from " + config.configDir)
- GlobalConfiguration.loadConfiguration(config.configDir)
- val configuration = GlobalConfiguration.getConfiguration
+ LOG.info("Loading configuration from " + configDir)
+ GlobalConfiguration.loadConfiguration(configDir)
+ val configuration = GlobalConfiguration.getConfiguration()
- if (config.configDir != null && new File(config.configDir).isDirectory) {
- configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
- }
+ if (new File(configDir).isDirectory) {
+ configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
+ }
- val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
- val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+ val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
- (configuration, config.executionMode, hostname, port)
- } getOrElse {
- throw new Exception("Invalid command line arguments: " + parser.usage)
- }
+ (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
}
/**
@@ -1082,9 +1089,12 @@ object JobManager {
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(configuration: Configuration,
- actorSystem: ActorSystem): (ActorRef, ActorRef) = {
+ actorSystem: ActorSystem,
+ streamingMode: StreamingMode): (ActorRef, ActorRef) = {
- startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME))
+ startJobManagerActors(configuration, actorSystem,
+ Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME),
+ streamingMode)
}
/**
* Starts the JobManager and job archiver based on the given configuration, in the
@@ -1096,18 +1106,21 @@ object JobManager {
* the actor will have the name generated by the actor system.
* @param archiverActorName Optionally the name of the archive actor. If none is given,
* the actor will have the name generated by the actor system.
+ * @param streamingMode The mode to run the system in (streaming vs. batch-only)
+ *
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(configuration: Configuration,
actorSystem: ActorSystem,
jobMangerActorName: Option[String],
- archiverActorName: Option[String]): (ActorRef, ActorRef) = {
+ archiverActorName: Option[String],
+ streamingMode: StreamingMode): (ActorRef, ActorRef) = {
val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
executionRetries, delayBetweenRetries,
timeout, _) = createJobManagerComponents(configuration)
- // start the archiver wither with the given name, or without (avoid name conflicts)
+ // start the archiver with the given name, or without (avoid name conflicts)
val archiver: ActorRef = archiverActorName match {
case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
case None => actorSystem.actorOf(archiveProps)
@@ -1115,7 +1128,7 @@ object JobManager {
val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
libraryCacheManager, archiver, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout)
+ delayBetweenRetries, timeout, streamingMode)
val jobManager: ActorRef = jobMangerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
deleted file mode 100644
index 4cd02c5..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-/**
- * Holder for command line parameters of the JobManager.
- *
- * @param configDir The directory to load the configuration from.
- * @param executionMode Mode for the JobManager.
- */
-case class JobManagerCLIConfiguration(configDir: String = null,
- executionMode: JobManagerMode = null) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8a6c394..edc12a1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -26,6 +26,7 @@ import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
import org.apache.flink.api.common.JobSubmissionResult
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
import org.apache.flink.runtime.jobgraph.JobGraph
@@ -44,10 +45,16 @@ import scala.concurrent.{Future, Await}
* @param userConfiguration Configuration object with the user provided configuration values
* @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
* [[ActorSystem]], otherwise false
+ * @param streamingMode True, if the system should be started in streaming mode, false if
+ * in pure batch mode.
*/
abstract class FlinkMiniCluster(val userConfiguration: Configuration,
- val singleActorSystem: Boolean) {
+ val singleActorSystem: Boolean,
+ val streamingMode: StreamingMode) {
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+ = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+
protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e2d7cc1..663307d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,18 +18,19 @@
package org.apache.flink.runtime.minicluster
-import akka.actor.{ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
+
import org.slf4j.LoggerFactory
-import akka.actor.ExtendedActorSystem
/**
* Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -41,9 +42,20 @@ import akka.actor.ExtendedActorSystem
* @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
* [[ActorSystem]], otherwise false
*/
-class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
- extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
-
+class LocalFlinkMiniCluster(userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
+ extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+
+
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+ = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+
+ def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+ // --------------------------------------------------------------------------
+
+
val jobClientActorSystem = if (singleActorSystem) {
jobManagerActorSystem
} else {
@@ -64,7 +76,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
override def startJobManager(system: ActorSystem): ActorRef = {
val config = configuration.clone()
- val (jobManager, archiver) = JobManager.startJobManagerActors(config, system)
+
+ val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
+
if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
val webServer = new WebInfoServer(configuration, jobManager, archiver)
webServer.start()
@@ -103,12 +117,13 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
} else {
None
}
-
+
TaskManager.startTaskManagerComponentsAndActor(config, system,
HOSTNAME, // network interface to bind to
Some(taskManagerActorName), // actor name
jobManagerPath, // job manager akka URL
localExecution, // start network stack?
+ streamingMode,
classOf[TaskManager])
}
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a95b5cb..8a45fa4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,7 +37,7 @@ import grizzled.slf4j.Logger
import org.apache.flink.configuration._
import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -999,8 +999,8 @@ object TaskManager {
/** Return code for critical errors during the runtime */
val RUNTIME_FAILURE_RETURN_CODE = 2
+ /** The name of the TaskManager actor */
val TASK_MANAGER_NAME = "taskmanager"
- val PROFILER_NAME = "profiler"
/** Maximum time (msecs) that the TaskManager will spend searching for a
* suitable network interface to use for communication */
@@ -1033,7 +1033,8 @@ object TaskManager {
EnvironmentInformation.checkJavaVersion()
// try to parse the command line arguments
- val configuration = try {
+ val (configuration: Configuration,
+ mode: StreamingMode) = try {
parseArgsAndLoadConfig(args)
}
catch {
@@ -1050,13 +1051,13 @@ object TaskManager {
LOG.info("Security is enabled. Starting secure TaskManager.")
SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
override def run(): Unit = {
- selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+ selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
}
})
}
else {
LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
- selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+ selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
}
}
catch {
@@ -1074,31 +1075,44 @@ object TaskManager {
* @return The parsed configuration.
*/
@throws(classOf[Exception])
- def parseArgsAndLoadConfig(args: Array[String]): Configuration = {
-
+ def parseArgsAndLoadConfig(args: Array[String]): (Configuration, StreamingMode) = {
+
// set up the command line parser
- val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
- head("flink task manager")
- opt[String]("configDir") action { (x, c) =>
- c.copy(configDir = x)
- } text "Specify configuration directory."
+ val parser = new scopt.OptionParser[TaskManagerCliOptions]("TaskManager") {
+ head("Flink TaskManager")
+
+ opt[String]("configDir") action { (param, conf) =>
+ conf.setConfigDir(param)
+ conf
+ } text {
+ "Specify configuration directory."
+ }
+
+ opt[String]("streamingMode").optional().action { (param, conf) =>
+ conf.setMode(param)
+ conf
+ } text {
+ "The streaming mode of the JobManager (STREAMING / BATCH)"
+ }
}
// parse the CLI arguments
- val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse {
+ val cliConfig = parser.parse(args, new TaskManagerCliOptions()).getOrElse {
throw new Exception(
s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
}
// load the configuration
- try {
- LOG.info("Loading configuration from " + cliConfig.configDir)
- GlobalConfiguration.loadConfiguration(cliConfig.configDir)
+ val conf: Configuration = try {
+ LOG.info("Loading configuration from " + cliConfig.getConfigDir())
+ GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
GlobalConfiguration.getConfiguration()
}
catch {
case e: Exception => throw new Exception("Could not load configuration", e)
}
+
+ (conf, cliConfig.getMode)
}
// --------------------------------------------------------------------------
@@ -1120,11 +1134,13 @@ object TaskManager {
* (library cache, shuffle network stack, ...), and starts the TaskManager itself.
* @param configuration The configuration for the TaskManager.
+ * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param taskManagerClass The actor class to instantiate.
* Allows to use TaskManager subclasses for example for YARN.
*/
@throws(classOf[Exception])
def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
+ streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager]) : Unit = {
val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
@@ -1132,7 +1148,8 @@ object TaskManager {
val (taskManagerHostname, actorSystemPort) =
selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
- runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass)
+ runTaskManager(taskManagerHostname, actorSystemPort, configuration,
+ streamingMode, taskManagerClass)
}
@throws(classOf[IOException])
@@ -1196,14 +1213,17 @@ object TaskManager {
* @param taskManagerHostname The hostname/address of the interface where the actor system
* will communicate.
* @param actorSystemPort The port at which the actor system will communicate.
+ * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param configuration The configuration for the TaskManager.
*/
@throws(classOf[Exception])
def runTaskManager(taskManagerHostname: String,
- actorSystemPort: Int,
- configuration: Configuration) : Unit = {
+ actorSystemPort: Int,
+ configuration: Configuration,
+ streamingMode: StreamingMode) : Unit = {
- runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager])
+ runTaskManager(taskManagerHostname, actorSystemPort, configuration,
+ streamingMode, classOf[TaskManager])
}
/**
@@ -1218,6 +1238,7 @@ object TaskManager {
* will communicate.
* @param actorSystemPort The port at which the actor system will communicate.
* @param configuration The configuration for the TaskManager.
+ * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
* subclasses for example for YARN.
*/
@@ -1225,6 +1246,7 @@ object TaskManager {
def runTaskManager(taskManagerHostname: String,
actorSystemPort: Int,
configuration: Configuration,
+ streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager]) : Unit = {
LOG.info("Starting TaskManager")
@@ -1264,6 +1286,7 @@ object TaskManager {
taskManagerHostname,
Some(TASK_MANAGER_NAME),
None, false,
+ streamingMode,
taskManagerClass)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
@@ -1317,6 +1340,7 @@ object TaskManager {
* JobManager hostname an port specified in the configuration.
* @param localTaskManagerCommunication If true, the TaskManager will not initiate the
* TCP network stack.
+ * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param taskManagerClass The class of the TaskManager actor. May be used to give
* subclasses that understand additional actor messages.
*
@@ -1339,6 +1363,7 @@ object TaskManager {
taskManagerActorName: Option[String],
jobManagerPath: Option[String],
localTaskManagerCommunication: Boolean,
+ streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
// get and check the JobManager config
@@ -1391,13 +1416,15 @@ object TaskManager {
relativeMemSize
}
+
+ val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
// now start the memory manager
val memoryManager = try {
new DefaultMemoryManager(memorySize,
taskManagerConfig.numberOfSlots,
netConfig.networkBufferSize,
- true)
+ preAllocateMemory)
}
catch {
case e: OutOfMemoryError => throw new Exception(
http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
deleted file mode 100644
index 5c71f5e..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-/**
- * Command line configuration object for the [[TaskManager]]
- *
- * @param configDir Path to configuration directory
- */
-case class TaskManagerCLIConfiguration(configDir: String = null)
[3/4] flink git commit: [tests] Adjust tests for dedicated streaming
mode and clean up test bases.
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 05093b5..762d77e 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -21,6 +21,7 @@ package org.apache.flink.test.util
import akka.actor.{Props, ActorRef, ActorSystem}
import akka.pattern.Patterns._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
@@ -39,11 +40,19 @@ import scala.concurrent.Await
* @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
* same [[ActorSystem]], otherwise false.
*/
-class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
- extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
+class ForkableFlinkMiniCluster(userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
+ extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+
- def this(userConfiguration: Configuration) = this(userConfiguration, true)
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+ = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+ def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+ // --------------------------------------------------------------------------
+
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")
@@ -77,10 +86,10 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
+
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
libraryCacheManager, archive, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout) with TestingJobManager)
+ delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
@@ -118,7 +127,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
TaskManager.startTaskManagerComponentsAndActor(config, system, HOSTNAME,
Some(TaskManager.TASK_MANAGER_NAME + index), jobManagerAkkaUrl, localExecution,
- classOf[TestingTaskManager])
+ streamingMode, classOf[TestingTaskManager])
}
def restartJobManager(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index d55aff4..e001fc8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -118,7 +119,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+ ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.STREAMING)._1();
// the TaskManager java command
String[] command = new String[] {
@@ -369,7 +370,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class);
// wait forever
Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 5482fc5..706d197 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -99,7 +100,7 @@ public class ProcessFailureCancelingITCase {
jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+ ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.BATCH_ONLY)._1();
// the TaskManager java command
String[] command = new String[] {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index ab70bcc..31f8560 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
-import org.junit.BeforeClass;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -37,10 +37,11 @@ import java.util.Collection;
@RunWith(Parameterized.class)
public class WebFrontendITCase extends MultipleProgramsTestBase {
- @BeforeClass
- public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, 4, true);
+ // make sure that the webserver is started for us!
+ static {
+ startWebServer = true;
}
+
public WebFrontendITCase(TestExecutionMode m) {
super(m);
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
index d19f543..5ad5351 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -19,7 +19,7 @@
package org.apache.flink.api.scala.actions
import org.apache.flink.api.scala._
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index e9b4ffe..8f1e1f8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala.functions
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.Assert.fail
import org.junit.{After, Before, Test, Rule}
import org.junit.rules.TemporaryFolder
@@ -49,7 +49,7 @@ class ClosureCleanerITCase(mode: TestExecutionMode) extends MultipleProgramsTest
@After
def after(): Unit = {
- compareResultsByLinesInMemory(result, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(result, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index c8d6639..b09ecc4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.hadoop.mapred
import org.apache.flink.api.scala._
import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
@@ -35,7 +35,8 @@ class WordCountMapredITCase extends JavaProgramTestBase {
}
protected override def postSubmit() {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_"))
+ TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
+ resultPath, Array[String](".", "_"))
}
protected def testProgram() {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index 8988baf..de2d376 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.hadoop.mapreduce
import org.apache.flink.api.scala._
import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.Job
@@ -37,7 +37,8 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
}
protected override def postSubmit() {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_"))
+ TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
+ resultPath, Array[String](".", "_"))
}
protected def testProgram() {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
index 7e395d9..eb5ee58 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -22,8 +22,8 @@ import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit.Assert._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -46,7 +46,7 @@ class ScalaCsvReaderWithPOJOITCase(mode: TestExecutionMode) extends MultipleProg
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
def createInputData(data: String): String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 432a6d4..79f781f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -48,7 +48,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index eaf4117..448479e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -50,7 +50,7 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index 512ec6c..82fd1a2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class CrossITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 8c10271..cf82ce9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -46,7 +46,7 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 7cf802c..3e98adf 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -17,20 +17,17 @@
*/
package org.apache.flink.api.scala.operators
+import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.apache.flink.api.scala._
-import org.junit.runners.Parameterized.Parameters
-import scala.collection.JavaConverters._
-import scala.collection.mutable
// TODO case class Tuple2[T1, T2](_1: T1, _2: T2)
// TODO case class Foo(a: Int, b: String
@@ -77,7 +74,7 @@ class ExamplesITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 082201b..54ae0e7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -17,12 +17,14 @@
*/
package org.apache.flink.api.scala.operators
+import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -30,8 +32,6 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
-import org.apache.flink.api.scala._
-
@RunWith(classOf[Parameterized])
class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
private var resultPath: String = null
@@ -48,7 +48,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 36cc3a7..4d6405d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,7 +45,7 @@ class FirstNITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index 61751da..21d7a0b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
@@ -50,7 +50,7 @@ class FlatMapITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 1b40205..576ecdf 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.test.javaApiOperators.GroupCombineITCase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.junit._
@@ -67,9 +67,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
.output(new DiscardingOutputFormat[Tuple1[String]])
- env.execute
+ env.execute()
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index b832647..09f0c51 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -28,8 +28,8 @@ CustomType}
import org.apache.flink.optimizer.Optimizer
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.hamcrest.core.{IsNot, IsEqual}
import org.junit._
@@ -58,7 +58,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
@After
def after(): Unit = {
if(expected != null) {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index fc4a9ce..a958250 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.RichJoinFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -48,7 +48,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 64fdc1f..d4555b7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class MapITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 08d82d2..7d26643 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunctio
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -46,7 +46,7 @@ class PartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index a9f420f..69cb6f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class ReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index 10bb7c5..d94f099 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index b0b3764..cab6ab7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -51,7 +51,7 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
@After
def after(): Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 48a33f7..2e554fb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala.runtime
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
@@ -57,7 +57,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -78,7 +78,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -99,7 +99,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -122,7 +122,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -143,7 +143,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -164,7 +164,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -187,7 +187,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -208,7 +208,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("60", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
}
@Test
@@ -229,7 +229,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
env.execute()
- compareResultsByLinesInMemory("80", resultPath)
+ TestBaseUtils.compareResultsByLinesInMemory("80", resultPath)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 3556ec1..564a0bd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -25,16 +25,19 @@ import java.util.Map;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.yarn.YarnTaskManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.yarn.FlinkYarnClient;
+
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* The entry point for running a TaskManager in a YARN container. The YARN container will invoke
* this class' main method.
@@ -51,8 +54,11 @@ public class YarnTaskManagerRunner {
// try to parse the command line arguments
final Configuration configuration;
+ final StreamingMode mode;
try {
- configuration = TaskManager.parseArgsAndLoadConfig(args);
+ scala.Tuple2<Configuration, StreamingMode> res = TaskManager.parseArgsAndLoadConfig(args);
+ configuration = res._1();
+ mode = res._2();
}
catch (Throwable t) {
LOG.error(t.getMessage(), t);
@@ -87,7 +93,8 @@ public class YarnTaskManagerRunner {
@Override
public Object run() {
try {
- TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, YarnTaskManager.class);
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration,
+ mode, YarnTaskManager.class);
}
catch (Throwable t) {
LOG.error("Error while starting the TaskManager", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 7884edd..c1a937e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -24,6 +24,7 @@ import akka.actor._
import grizzled.slf4j.Logger
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
@@ -54,6 +55,8 @@ object ApplicationMaster {
EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args)
EnvironmentInformation.checkJavaVersion()
org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
+
+ val streamingMode = StreamingMode.BATCH_ONLY
val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
@@ -91,9 +94,12 @@ object ApplicationMaster {
val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
- val (config, system, jobManager, archiver) = startJobManager(currDir, ownHostname,
- dynamicPropertiesEncodedString)
-
+ val (config: Configuration,
+ system: ActorSystem,
+ jobManager: ActorRef,
+ archiver: ActorRef) = startJobManager(currDir, ownHostname,
+ dynamicPropertiesEncodedString,
+ streamingMode)
actorSystem = system
val extActor = system.asInstanceOf[ExtendedActorSystem]
val jobManagerPort = extActor.provider.getDefaultAddress.port.get
@@ -195,15 +201,12 @@ object ApplicationMaster {
/**
* Starts the JobManager and all its components.
*
- * @param currDir
- * @param hostname
- * @param dynamicPropertiesEncodedString
- *
* @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef)
*/
def startJobManager(currDir: String,
hostname: String,
- dynamicPropertiesEncodedString: String):
+ dynamicPropertiesEncodedString: String,
+ streamingMode: StreamingMode):
(Configuration, ActorSystem, ActorRef, ActorRef) = {
LOG.info("Starting JobManager for YARN")
@@ -236,7 +239,7 @@ object ApplicationMaster {
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
libraryCacheManager, archiver, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout) with ApplicationMasterActor)
+ delayBetweenRetries, timeout, streamingMode) with ApplicationMasterActor)
LOG.debug("Starting JobManager actor")
val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
[4/4] flink git commit: [tests] Adjust tests for dedicated streaming
mode and clean up test bases.
Posted by se...@apache.org.
[tests] Adjust tests for dedicated streaming mode and clean up test bases.
This closes #718
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04377224
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04377224
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04377224
Branch: refs/heads/master
Commit: 0437722449c59fdc21fc5d18a59e5e0b961208af
Parents: efec229
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 17:24:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:23 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 3 +-
.../JobManagerProcessReapingTest.java | 3 +-
.../jobmanager/JobManagerStartupTest.java | 11 ++-
.../flink/runtime/jobmanager/JobSubmitTest.java | 3 +-
...askManagerComponentsStartupShutdownTest.java | 4 +-
.../TaskManagerProcessReapingTest.java | 5 +-
.../TaskManagerRegistrationTest.java | 9 +-
.../runtime/taskmanager/TaskManagerTest.java | 12 ++-
.../taskmanager/TestManagerStartupTest.java | 10 ++-
.../jobmanager/JobManagerRegistrationTest.scala | 6 +-
.../runtime/testingUtils/TestingCluster.scala | 24 +++--
.../runtime/testingUtils/TestingUtils.scala | 28 +-----
.../api/complex/ComplexIntegrationTest.java | 9 +-
.../util/StreamingMultipleProgramsTestBase.java | 53 ++++++-----
.../util/StreamingProgramTestBase.java | 2 +-
.../scala/table/test/AggregationsITCase.scala | 20 ++---
.../flink/api/scala/table/test/AsITCase.scala | 21 ++---
.../api/scala/table/test/CastingITCase.scala | 14 +--
.../scala/table/test/ExpressionsITCase.scala | 22 ++---
.../api/scala/table/test/FilterITCase.scala | 20 ++---
.../table/test/GroupedAggreagationsITCase.scala | 14 +--
.../flink/api/scala/table/test/JoinITCase.scala | 22 ++---
.../api/scala/table/test/SelectITCase.scala | 22 ++---
.../table/test/StringExpressionsITCase.scala | 16 ++--
.../util/AbstractMultipleProgramsTestBase.java | 84 ------------------
.../flink/test/util/AbstractTestBase.java | 14 +--
.../test/util/MultipleProgramsTestBase.java | 52 ++++++++++-
.../apache/flink/test/util/TestBaseUtils.java | 92 +++++++++++---------
.../apache/flink/test/util/FlinkTestBase.scala | 8 +-
.../test/util/ForkableFlinkMiniCluster.scala | 21 +++--
.../AbstractProcessFailureRecoveryTest.java | 5 +-
.../recovery/ProcessFailureCancelingITCase.java | 3 +-
.../flink/test/web/WebFrontendITCase.java | 9 +-
.../api/scala/actions/CountCollectITCase.scala | 2 +-
.../scala/functions/ClosureCleanerITCase.scala | 6 +-
.../hadoop/mapred/WordCountMapredITCase.scala | 5 +-
.../mapreduce/WordCountMapreduceITCase.scala | 5 +-
.../scala/io/ScalaCsvReaderWithPOJOITCase.scala | 6 +-
.../api/scala/operators/AggregateITCase.scala | 6 +-
.../api/scala/operators/CoGroupITCase.scala | 6 +-
.../flink/api/scala/operators/CrossITCase.scala | 6 +-
.../api/scala/operators/DistinctITCase.scala | 6 +-
.../api/scala/operators/ExamplesITCase.scala | 13 ++-
.../api/scala/operators/FilterITCase.scala | 10 +--
.../api/scala/operators/FirstNITCase.scala | 6 +-
.../api/scala/operators/FlatMapITCase.scala | 6 +-
.../scala/operators/GroupCombineITCase.scala | 5 +-
.../api/scala/operators/GroupReduceITCase.scala | 6 +-
.../flink/api/scala/operators/JoinITCase.scala | 6 +-
.../flink/api/scala/operators/MapITCase.scala | 6 +-
.../api/scala/operators/PartitionITCase.scala | 6 +-
.../api/scala/operators/ReduceITCase.scala | 6 +-
.../api/scala/operators/SumMinMaxITCase.scala | 2 +-
.../flink/api/scala/operators/UnionITCase.scala | 6 +-
.../scala/runtime/ScalaSpecialTypesITCase.scala | 22 ++---
.../yarn/appMaster/YarnTaskManagerRunner.java | 15 +++-
.../apache/flink/yarn/ApplicationMaster.scala | 21 +++--
57 files changed, 421 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ba819ca..d5df633 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -945,7 +945,8 @@ object JobManager {
* @param args command line arguments
* @return Quadruple of configuration, execution mode and an optional listening address
*/
- def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+ def parseArgs(args: Array[String]):
+ (Configuration, JobManagerMode, StreamingMode, String, Int) = {
val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
head("Flink JobManager")
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index a332ee1..be73bf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -27,6 +27,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -183,7 +184,7 @@ public class JobManagerProcessReapingTest {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
- JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", port);
+ JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.BATCH_ONLY, "localhost", port);
System.exit(0);
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 070e376..e7665a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -28,6 +28,7 @@ import com.google.common.io.Files;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;
@@ -80,7 +81,8 @@ public class JobManagerStartupTest {
}
try {
- JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER, "localhost", portNum);
+ JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER,
+ StreamingMode.BATCH_ONLY, "localhost", portNum);
fail("this should throw an exception");
}
catch (Exception e) {
@@ -94,7 +96,9 @@ public class JobManagerStartupTest {
try {
portOccupier.close();
}
- catch (Throwable t) {}
+ catch (Throwable t) {
+ // ignore
+ }
}
}
@@ -117,7 +121,8 @@ public class JobManagerStartupTest {
failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
try {
- JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
+ JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER,
+ StreamingMode.BATCH_ONLY, "localhost", portNum);
fail("this should fail with an exception");
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index e69687f..7b6d688 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
@@ -61,7 +62,7 @@ public class JobSubmitTest {
scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty();
jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
- jobManager = JobManager.startJobManagerActors(config, jobManagerSystem)._1();
+ jobManager = JobManager.startJobManagerActors(config, jobManagerSystem, StreamingMode.BATCH_ONLY)._1();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7d4994d..a67cd00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -68,7 +69,8 @@ public class TaskManagerComponentsStartupShutdownTest {
try {
actorSystem = AkkaUtils.createLocalActorSystem(config);
- final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem)._1();
+ final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem,
+ StreamingMode.BATCH_ONLY)._1();
// create the components for the TaskManager manually
final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 91fadca..c55a721 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -24,6 +24,7 @@ import akka.actor.PoisonPill;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -84,7 +85,7 @@ public class TaskManagerProcessReapingTest {
jmActorSystem = AkkaUtils.createActorSystem(
new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
- JobManager.startJobManagerActors(new Configuration(), jmActorSystem);
+ JobManager.startJobManagerActors(new Configuration(), jmActorSystem, StreamingMode.BATCH_ONLY);
final int taskManagerPort = NetUtils.getAvailablePort();
@@ -198,7 +199,7 @@ public class TaskManagerProcessReapingTest {
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
- TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
+ TaskManager.runTaskManager("localhost", taskManagerPort, cfg, StreamingMode.BATCH_ONLY);
// wait forever
Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 69964ea..3e5f2cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,6 +28,7 @@ import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -152,7 +153,8 @@ public class TaskManagerRegistrationTest {
Thread.sleep(6000);
// now start the JobManager, with the regular akka URL
- final ActorRef jobManager = JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+ final ActorRef jobManager =
+ JobManager.startJobManagerActors(new Configuration(), actorSystem, StreamingMode.BATCH_ONLY)._1();
// check that the TaskManagers are registered
Future<Object> responseFuture = Patterns.ask(
@@ -371,6 +373,7 @@ public class TaskManagerRegistrationTest {
NONE_STRING, // no actor name -> random
new Some<String>(jobManager.path().toString()), // job manager path
false, // init network stack !!!
+ StreamingMode.BATCH_ONLY,
TaskManager.class);
watch(taskManager);
@@ -415,7 +418,8 @@ public class TaskManagerRegistrationTest {
private static ActorRef startJobManager() throws Exception {
// start the actors. don't give names, so they get generated names and we
// avoid conflicts with the actor names
- return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING)._1();
+ return JobManager.startJobManagerActors(new Configuration(), actorSystem,
+ NONE_STRING, NONE_STRING, StreamingMode.BATCH_ONLY)._1();
}
private static ActorRef startTaskManager(ActorRef jobManager) throws Exception {
@@ -430,6 +434,7 @@ public class TaskManagerRegistrationTest {
NONE_STRING, // no actor name -> random
new Some<String>(jobManagerUrl), // job manager path
true, // local network stack only
+ StreamingMode.BATCH_ONLY,
TaskManager.class);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a308c81..d33dcd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -27,9 +27,11 @@ import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -59,11 +61,14 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -903,7 +908,8 @@ public class TaskManagerTest {
return createTaskManager(jobManager, waitForRegistration, true, NetUtils.getAvailablePort());
}
- public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, boolean useLocalCommunication, int dataPort) {
+ public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration,
+ boolean useLocalCommunication, int dataPort) {
ActorRef taskManager = null;
try {
Configuration cfg = new Configuration();
@@ -916,7 +922,9 @@ public class TaskManagerTest {
cfg, system, "localhost",
Option.<String>empty(),
jobMangerUrl,
- useLocalCommunication, TestingTaskManager.class);
+ useLocalCommunication,
+ StreamingMode.BATCH_ONLY,
+ TestingTaskManager.class);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
index 2b945e1..a033eb1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.StreamingMode;
import org.junit.Test;
import java.io.File;
@@ -53,7 +54,8 @@ public class TestManagerStartupTest {
final int port = blocker.getLocalPort();
try {
- TaskManager.runTaskManager(localHostName, port, new Configuration(), TaskManager.class);
+ TaskManager.runTaskManager(localHostName, port, new Configuration(),
+ StreamingMode.BATCH_ONLY, TaskManager.class);
fail("This should fail with an IOException");
}
catch (IOException e) {
@@ -101,7 +103,7 @@ public class TestManagerStartupTest {
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
try {
- TaskManager.runTaskManager("localhost", 0, cfg);
+ TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
fail("Should fail synchronously with an exception");
}
catch (IOException e) {
@@ -138,7 +140,7 @@ public class TestManagerStartupTest {
// something invalid
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
try {
- TaskManager.runTaskManager("localhost", 0, cfg);
+ TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
fail("Should fail synchronously with an exception");
}
catch (IllegalConfigurationException e) {
@@ -150,7 +152,7 @@ public class TestManagerStartupTest {
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
try {
- TaskManager.runTaskManager("localhost", 0, cfg);
+ TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
fail("Should fail synchronously with an exception");
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 5fde5ea..5ae6b5b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -23,6 +23,7 @@ import java.net.InetAddress
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
@@ -122,8 +123,9 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
}
private def startTestingJobManager(system: ActorSystem): ActorRef = {
- val (jm: ActorRef, _) = JobManager.startJobManagerActors(
- new Configuration(), _system, None, None)
+ val (jm: ActorRef, _) = JobManager.startJobManagerActors(new Configuration(), _system,
+ None, None,
+ StreamingMode.BATCH_ONLY)
jm
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9f9fe93..a904f60 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorRef, Props, ActorSystem}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
@@ -33,9 +34,19 @@ import org.apache.flink.runtime.taskmanager.TaskManager
* @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]],
* otherwise false
*/
-class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
- extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
+class TestingCluster(userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
+ extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+ = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+
+ def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+ // --------------------------------------------------------------------------
+
override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -52,13 +63,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
executionRetries, delayBetweenRetries,
timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
-
+
val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
+
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
libraryCacheManager, archive, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout) with TestingJobManager)
+ delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
}
@@ -72,12 +83,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
} else {
None
}
-
+
TaskManager.startTaskManagerComponentsAndActor(configuration, system,
HOSTNAME,
Some(tmActorName),
jobManagerPath,
numTaskManagers == 1,
+ streamingMode,
classOf[TestingTaskManager])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 63dce31..3611633 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.CallingThreadDispatcher
+
import com.typesafe.config.ConfigFactory
+
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.taskmanager.TaskManager
-import scala.concurrent.duration._
+import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
@@ -55,28 +56,7 @@ object TestingUtils {
}
def getDefaultTestingActorSystemConfig = testConfig
-
-
- def startTestingTaskManagerWithConfiguration(hostname: String,
- jobManagerURL: String,
- config: Configuration,
- system: ActorSystem) : ActorRef = {
-
-
- TaskManager.startTaskManagerComponentsAndActor(config, system,
- hostname,
- None, // random actor name
- Some(jobManagerURL), // job manager
- true, classOf[TestingTaskManager])
- }
-
- def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
-
- val jmURL = jobManager.path.toString
- val config = new Configuration()
-
- startTestingTaskManagerWithConfiguration("localhost", jmURL, config, system)
- }
+
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 738654a..6fdd839 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -52,8 +52,6 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.Serializable;
import java.text.SimpleDateFormat;
@@ -65,7 +63,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
// *************************************************************************
@@ -76,10 +74,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
private String resultPath2;
private String expected1;
private String expected2;
-
- public ComplexIntegrationTest(TestExecutionMode mode) {
- super(mode);
- }
+
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 36e62f9..945ac07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,12 +18,12 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
-import org.junit.runners.Parameterized;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
-import java.util.ArrayList;
-import java.util.Collection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
/**
* Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -53,25 +53,34 @@ import java.util.Collection;
*
* }</pre>
*/
-public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
- public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
- super(mode);
- switch(this.mode){
- case CLUSTER:
- TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
- clusterEnv.setAsContext();
- break;
- case COLLECTION:
- throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
- }
+ // ------------------------------------------------------------------------
+ // The mini cluster that is shared across tests
+ // ------------------------------------------------------------------------
+
+ protected static final int DEFAULT_PARALLELISM = 4;
+
+ protected static ForkableFlinkMiniCluster cluster = null;
+
+ // ------------------------------------------------------------------------
+
+ public StreamingMultipleProgramsTestBase() {
+ TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+ clusterEnv.setAsContext();
+ }
+
+ // ------------------------------------------------------------------------
+ // Cluster setup & teardown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() throws Exception{
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false);
}
- @Parameterized.Parameters(name = "Execution mode = {0}")
- public static Collection<TestExecutionMode[]> executionModes() {
- TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
- ArrayList<TestExecutionMode[]> temsList = new ArrayList<TestExecutionMode[]>();
- temsList.add(tems);
- return temsList;
+ @AfterClass
+ public static void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 0446b61..23be327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Assert;
+import org.junit.Assert;
import org.junit.Test;
public abstract class StreamingProgramTestBase extends AbstractTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 38be85e..3b7ab8d 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAggregationTypes: Unit = {
+ def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -62,7 +62,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test(expected = classOf[ExpressionException])
- def testAggregationOnNonExistingField: Unit = {
+ def testAggregationOnNonExistingField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -74,7 +74,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test
- def testWorkingAggregationDataTypes: Unit = {
+ def testWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(
@@ -88,7 +88,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test
- def testAggregationWithArithmetic: Unit = {
+ def testAggregationWithArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
@@ -100,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test(expected = classOf[ExpressionException])
- def testNonWorkingAggregationDataTypes: Unit = {
+ def testNonWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("Hello", 1)).toTable
@@ -112,7 +112,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test(expected = classOf[ExpressionException])
- def testNoNestedAggregations: Unit = {
+ def testNoNestedAggregations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("Hello", 1)).toTable
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 3a0cc69..c6259ec 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -32,6 +32,7 @@ import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+
private var resultPath: String = null
private var expected: String = ""
private val _tempFolder = new TemporaryFolder()
@@ -45,12 +46,12 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAs: Unit = {
+ def testAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -66,7 +67,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToFewFields: Unit = {
+ def testAsWithToFewFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -77,7 +78,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToManyFields: Unit = {
+ def testAsWithToManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -88,7 +89,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithAmbiguousFields: Unit = {
+ def testAsWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -99,7 +100,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithNonFieldReference1: Unit = {
+ def testAsWithNonFieldReference1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
// as can only have field references
@@ -111,7 +112,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithNonFieldReference2: Unit = {
+ def testAsWithNonFieldReference2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
// as can only have field references
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 9557985..736cf68 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -43,12 +43,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAutoCastToString: Unit = {
+ def testAutoCastToString(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
@@ -60,7 +60,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
}
@Test
- def testNumericAutoCastInArithmetic: Unit = {
+ def testNumericAutoCastInArithmetic(): Unit = {
// don't test everything, just some common cast directions
@@ -74,7 +74,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
}
@Test
- def testNumericAutoCastInComparison: Unit = {
+ def testNumericAutoCastInComparison(): Unit = {
// don't test everything, just some common cast directions
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 8c60acf..d9de287 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testArithmetic: Unit = {
+ def testArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, 10)).as('a, 'b)
@@ -61,7 +61,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testLogic: Unit = {
+ def testLogic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, true)).as('a, 'b)
@@ -73,7 +73,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testComparisons: Unit = {
+ def testComparisons(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
@@ -85,7 +85,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testBitwiseOperations: Unit = {
+ def testBitwiseOperations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -98,7 +98,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testBitwiseWithAutocast: Unit = {
+ def testBitwiseWithAutocast(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -111,7 +111,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test(expected = classOf[ExpressionException])
- def testBitwiseWithNonWorkingAutocast: Unit = {
+ def testBitwiseWithNonWorkingAutocast(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -124,7 +124,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testCaseInsensitiveForAs: Unit = {
+ def testCaseInsensitiveForAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 982a302..bc51a7e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -47,12 +47,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAllRejectingFilter: Unit = {
+ def testAllRejectingFilter(): Unit = {
/*
* Test all-rejecting filter.
*/
@@ -67,7 +67,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testAllPassingFilter: Unit = {
+ def testAllPassingFilter(): Unit = {
/*
* Test all-passing filter.
*/
@@ -87,7 +87,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testFilterOnStringTupleField: Unit = {
+ def testFilterOnStringTupleField(): Unit = {
/*
* Test filter on String tuple field.
*/
@@ -100,7 +100,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testFilterOnIntegerTupleField: Unit = {
+ def testFilterOnIntegerTupleField(): Unit = {
/*
* Test filter on Integer tuple field.
*/
@@ -120,7 +120,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@Ignore
@Test
- def testFilterBasicType: Unit = {
+ def testFilterBasicType(): Unit = {
/*
* Test filter on basic type
*/
@@ -137,7 +137,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@Ignore
@Test
- def testFilterOnCustomType: Unit = {
+ def testFilterOnCustomType(): Unit = {
/*
* Test filter on custom type
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 1f29722..d76d75c 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test(expected = classOf[ExpressionException])
- def testGroupingOnNonExistentField: Unit = {
+ def testGroupingOnNonExistentField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -63,7 +63,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
}
@Test
- def testGroupedAggregate: Unit = {
+ def testGroupedAggregate(): Unit = {
// the grouping key needs to be forwarded to the intermediate DataSet, even
// if we don't want the key in the output
@@ -79,7 +79,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
}
@Test
- def testGroupingKeyForwardIfNotUsed: Unit = {
+ def testGroupingKeyForwardIfNotUsed(): Unit = {
// the grouping key needs to be forwarded to the intermediate DataSet, even
// if we don't want the key in the output
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 17221d8..b3baa56 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testJoin: Unit = {
+ def testJoin(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -63,7 +63,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test
- def testJoinWithFilter: Unit = {
+ def testJoinWithFilter(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -76,7 +76,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test
- def testJoinWithMultipleKeys: Unit = {
+ def testJoinWithMultipleKeys(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -90,7 +90,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test(expected = classOf[ExpressionException])
- def testJoinNonExistingKey: Unit = {
+ def testJoinNonExistingKey(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -103,7 +103,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test(expected = classOf[ExpressionException])
- def testJoinWithNonMatchingKeyTypes: Unit = {
+ def testJoinWithNonMatchingKeyTypes(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -116,7 +116,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test(expected = classOf[ExpressionException])
- def testJoinWithAmbiguousFields: Unit = {
+ def testJoinWithAmbiguousFields(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
@@ -129,7 +129,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test
- def testJoinWithAggregation: Unit = {
+ def testJoinWithAggregation(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 6ba6c9f..1a13d93 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testSimpleSelectAll: Unit = {
+ def testSimpleSelectAll(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
@@ -66,7 +66,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testSimpleSelectAllWithAs: Unit = {
+ def testSimpleSelectAllWithAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
@@ -82,7 +82,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testSimpleSelectWithNaming: Unit = {
+ def testSimpleSelectWithNaming(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -97,7 +97,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToFewFields: Unit = {
+ def testAsWithToFewFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -108,7 +108,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToManyFields: Unit = {
+ def testAsWithToManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -119,7 +119,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test(expected = classOf[ExpressionException])
- def testAsWithAmbiguousFields: Unit = {
+ def testAsWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -131,7 +131,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@Test(expected = classOf[ExpressionException])
- def testOnlyFieldRefInAs: Unit = {
+ def testOnlyFieldRefInAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 3f0f46f..65fe77a 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testSubstring: Unit = {
+ def testSubstring(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
.select('a.substring(0, 'b))
@@ -60,7 +60,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@Test
- def testSubstringWithMaxEnd: Unit = {
+ def testSubstringWithMaxEnd(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
.select('a.substring('b))
@@ -71,7 +71,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@Test(expected = classOf[ExpressionException])
- def testNonWorkingSubstring1: Unit = {
+ def testNonWorkingSubstring1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
@@ -83,7 +83,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@Test(expected = classOf[ExpressionException])
- def testNonWorkingSubstring2: Unit = {
+ def testNonWorkingSubstring2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
deleted file mode 100644
index ef81dfe..0000000
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Abstract base class for unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the ExecutionEnvironment from
- * the context:
- *
- * <pre>{@code
- *
- * @Test
- * public void someTest() {
- * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * @Test
- * public void anotherTest() {
- * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * }</pre>
- */
-public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
-
- /**
- * Enum that defines which execution environment to run the next test on:
- * An embedded local flink cluster, or the collection execution backend.
- */
- public enum TestExecutionMode {
- CLUSTER,
- COLLECTION
- }
-
- // -----------------------------------------------------------------------------------------...
-
- private static final int DEFAULT_PARALLELISM = 4;
-
- protected static ForkableFlinkMiniCluster cluster = null;
-
- protected transient TestExecutionMode mode;
-
- public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
- this.mode = mode;
- }
-
- @BeforeClass
- public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c8b79ef..a5a7da9 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
@@ -36,15 +37,16 @@ public abstract class AbstractTestBase extends TestBaseUtils {
protected final Configuration config;
- protected ForkableFlinkMiniCluster executor;
-
private final List<File> tempFiles;
+ private final FiniteDuration timeout;
+
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
-
- private final FiniteDuration timeout;
+
+ protected ForkableFlinkMiniCluster executor;
+
public AbstractTestBase(Configuration config) {
this.config = config;
@@ -57,8 +59,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
// Local Test Cluster Life Cycle
// --------------------------------------------------------------------------------------------
- public void startCluster() throws Exception{
- this.executor = startCluster(numTaskManagers, taskManagerNumSlots, false);
+ public void startCluster() throws Exception {
+ this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false);
}
public void stopCluster() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index e0c4360..8dab485 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.util;
+import org.apache.flink.runtime.StreamingMode;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
import java.util.Arrays;
@@ -51,11 +54,36 @@ import java.util.Collection;
*
* }</pre>
*/
-public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class MultipleProgramsTestBase extends TestBaseUtils {
+ /**
+ * Enum that defines which execution environment to run the next test on:
+ * An embedded local flink cluster, or the collection execution backend.
+ */
+ public enum TestExecutionMode {
+ CLUSTER,
+ COLLECTION
+ }
+
+ // ------------------------------------------------------------------------
+ // The mini cluster that is shared across tests
+ // ------------------------------------------------------------------------
+
+ protected static final int DEFAULT_PARALLELISM = 4;
+
+ protected static boolean startWebServer = false;
+
+ protected static ForkableFlinkMiniCluster cluster = null;
+
+ // ------------------------------------------------------------------------
+
+ protected final TestExecutionMode mode;
+
+
public MultipleProgramsTestBase(TestExecutionMode mode){
- super(mode);
- switch(this.mode){
+ this.mode = mode;
+
+ switch(mode){
case CLUSTER:
TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
clusterEnv.setAsContext();
@@ -67,6 +95,24 @@ public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
}
}
+ // ------------------------------------------------------------------------
+ // Cluster setup & teardown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() throws Exception{
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+
+ // ------------------------------------------------------------------------
+ // Parametrization lets the tests run in cluster and collection mode
+ // ------------------------------------------------------------------------
+
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<TestExecutionMode[]> executionModes(){
return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b0e4ae9..29cb2a4 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.hadoop.fs.FileSystem;
@@ -79,46 +80,55 @@ public class TestBaseUtils {
protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
- protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration
- (DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+ protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+ // ------------------------------------------------------------------------
+
protected static File logDir;
protected TestBaseUtils(){
verifyJvmOptions();
}
-
- private void verifyJvmOptions() {
+
+ private static void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
-
- protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
- taskManagerNumSlots, boolean startWebserver) throws Exception {
+
+
+ protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
+ int taskManagerNumSlots,
+ StreamingMode mode,
+ boolean startWebserver) throws Exception {
+
logDir = File.createTempFile("TestBaseUtils-logdir", null);
Assert.assertTrue("Unable to delete temp file", logDir.delete());
Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
+
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
- config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
+
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, startWebserver);
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
- return new ForkableFlinkMiniCluster(config);
+
+ return new ForkableFlinkMiniCluster(config, true, mode);
}
- protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout)
- throws Exception {
- if(logDir != null) {
- logDir.delete();
+ protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+ if (logDir != null) {
+ FileUtils.deleteDirectory(logDir);
}
- if(executor != null) {
+ if (executor != null) {
int numUnreleasedBCVars = 0;
int numActiveConnections = 0;
{
@@ -169,11 +179,11 @@ public class TestBaseUtils {
// Result Checking
// --------------------------------------------------------------------------------------------
- public BufferedReader[] getResultReader(String resultPath) throws IOException {
+ public static BufferedReader[] getResultReader(String resultPath) throws IOException {
return getResultReader(resultPath, new String[]{}, false);
}
- public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
+ public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {
File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
@@ -206,12 +216,11 @@ public class TestBaseUtils {
- public BufferedInputStream[] getResultInputStream(String resultPath) throws
- IOException {
+ public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
return getResultInputStream(resultPath, new String[]{});
}
- public BufferedInputStream[] getResultInputStream(String resultPath, String[]
+ public static BufferedInputStream[] getResultInputStream(String resultPath, String[]
excludePrefixes) throws IOException {
File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
@@ -221,37 +230,37 @@ public class TestBaseUtils {
return inStreams;
}
- public void readAllResultLines(List<String> target, String resultPath) throws
- IOException {
+ public static void readAllResultLines(List<String> target, String resultPath) throws IOException {
readAllResultLines(target, resultPath, new String[]{});
}
- public void readAllResultLines(List<String> target, String resultPath, String[]
- excludePrefixes) throws IOException {
+ public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes)
+ throws IOException {
+
readAllResultLines(target, resultPath, excludePrefixes, false);
}
- public void readAllResultLines(List<String> target, String resultPath, String[]
+ public static void readAllResultLines(List<String> target, String resultPath, String[]
excludePrefixes, boolean inOrderOfFiles) throws IOException {
for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles)) {
- String s = null;
+ String s;
while ((s = reader.readLine()) != null) {
target.add(s);
}
}
}
- public void compareResultsByLinesInMemory(String expectedResultStr, String
+ public static void compareResultsByLinesInMemory(String expectedResultStr, String
resultPath) throws Exception {
compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
}
- public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
+ public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, false);
- String[] result = (String[]) list.toArray(new String[list.size()]);
+ String[] result = list.toArray(new String[list.size()]);
Arrays.sort(result);
String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
@@ -261,18 +270,18 @@ public class TestBaseUtils {
Assert.assertArrayEquals(expected, result);
}
- public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+ public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
String resultPath) throws
Exception {
compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
}
- public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+ public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
String resultPath, String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, true);
- String[] result = (String[]) list.toArray(new String[list.size()]);
+ String[] result = list.toArray(new String[list.size()]);
String[] expected = expectedResultStr.split("\n");
@@ -280,7 +289,7 @@ public class TestBaseUtils {
Assert.assertArrayEquals(expected, result);
}
- public void checkLinesAgainstRegexp(String resultPath, String regexp){
+ public static void checkLinesAgainstRegexp(String resultPath, String regexp){
Pattern pattern = Pattern.compile(regexp);
Matcher matcher = pattern.matcher("");
@@ -301,17 +310,17 @@ public class TestBaseUtils {
}
- public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+ public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
String delimiter, double maxDelta) throws Exception {
compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
}
- public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+ public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, false);
- String[] result = (String[]) list.toArray(new String[list.size()]);
+ String[] result = list.toArray(new String[list.size()]);
String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
Assert.assertEquals("Wrong number of result lines.", expected.length, result.length);
@@ -330,7 +339,7 @@ public class TestBaseUtils {
}
}
- public <X> void compareResultCollections(List<X> expected, List<X> actual,
+ public static <X> void compareResultCollections(List<X> expected, List<X> actual,
Comparator<X> comparator) {
Assert.assertEquals(expected.size(), actual.size());
@@ -445,8 +454,8 @@ public class TestBaseUtils {
protected static void deleteRecursively(File f) throws IOException {
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
- } else {
- f.delete();
+ } else if (!f.delete()) {
+ System.err.println("Failed to delete file " + f.getAbsolutePath());
}
}
@@ -469,13 +478,13 @@ public class TestBaseUtils {
// Web utils
//---------------------------------------------------------------------------------------------
- public static String getFromHTTP(String url) throws Exception{
+ public static String getFromHTTP(String url) throws Exception {
URL u = new URL(url);
LOG.info("Accessing URL "+url+" as URL: "+u);
HttpURLConnection connection = (HttpURLConnection) u.openConnection();
connection.setConnectTimeout(100000);
connection.connect();
- InputStream is = null;
+ InputStream is;
if(connection.getResponseCode() >= 400) {
// error!
LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
@@ -486,5 +495,4 @@ public class TestBaseUtils {
return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 2e664c1..3ea8624 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.flink.test.util
+import org.apache.flink.runtime.StreamingMode
import org.scalatest.{Suite, BeforeAndAfter}
/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
@@ -46,17 +47,16 @@ import org.scalatest.{Suite, BeforeAndAfter}
* }}}
*
*/
-trait FlinkTestBase
- extends BeforeAndAfter {
+trait FlinkTestBase extends BeforeAndAfter {
that: Suite =>
var cluster: Option[ForkableFlinkMiniCluster] = None
val parallelism = 4
before {
- val cl = TestBaseUtils.startCluster(1, parallelism, false)
+ val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false)
val clusterEnvironment = new TestEnvironment(cl, parallelism)
- clusterEnvironment.setAsContext
+ clusterEnvironment.setAsContext()
cluster = Some(cl)
}
[2/4] flink git commit: [FLINK-2085] [runtime] Add an option to let
the MemoryManager allocate and release memory as needed.
Posted by se...@apache.org.
[FLINK-2085] [runtime] Add an option to let the MemoryManager allocate and release memory as needed.
This is an alternative mode to the current mode that pre-allocates all memory.
The default remains to pre-allocate all memory.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea60678e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea60678e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea60678e
Branch: refs/heads/master
Commit: ea60678e9ecdcf954cddf0ce6d0e0b6da01b4cbc
Parents: 924830f
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 18:35:40 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:22 2015 +0200
----------------------------------------------------------------------
.../memorymanager/DefaultMemoryManager.java | 138 ++++++++----
.../flink/runtime/taskmanager/TaskManager.scala | 6 +-
.../flink/runtime/io/disk/ChannelViewsTest.java | 2 +-
.../io/disk/FileChannelStreamsITCase.java | 2 +-
.../runtime/io/disk/FileChannelStreamsTest.java | 4 +-
.../disk/SeekableFileChannelInputViewTest.java | 2 +-
.../io/disk/iomanager/IOManagerITCase.java | 4 +-
.../IOManagerPerformanceBenchmark.java | 4 +-
.../memory/DefaultMemoryManagerTest.java | 197 -----------------
.../memory/MemoryManagerLazyAllocationTest.java | 209 +++++++++++++++++++
.../flink/runtime/memory/MemoryManagerTest.java | 208 ++++++++++++++++++
.../flink/runtime/memory/MemorySegmentTest.java | 4 +-
.../operators/drivers/TestTaskContext.java | 2 +-
.../NonReusingReOpenableHashTableITCase.java | 10 +-
.../hash/ReusingReOpenableHashTableITCase.java | 10 +-
.../sort/LargeRecordHandlerITCase.java | 4 +-
.../operators/sort/LargeRecordHandlerTest.java | 6 +-
...askManagerComponentsStartupShutdownTest.java | 2 +-
18 files changed, 539 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index 28ebe13..b041ac9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -66,16 +66,21 @@ public class DefaultMemoryManager implements MemoryManager {
private final int pageSizeBits; // the number of bits that the power-of-two page size corresponds to
private final int totalNumPages; // The initial total size, for verification.
-
- private boolean isShutDown; // flag whether the close() has already been invoked.
-
- /**
- * Number of slots of the task manager
- */
- private final int numberOfSlots;
+ /** The total size of the memory managed by this memory manager */
private final long memorySize;
+ /** Number of slots of the task manager */
+ private final int numberOfSlots;
+
+ private final boolean isPreAllocated;
+
+ /** The number of memory pages that have not been allocated and are available for lazy allocation */
+ private int numNonAllocatedPages;
+
+ /** flag whether the close() has already been invoked */
+ private boolean isShutDown;
+
// ------------------------------------------------------------------------
// Constructors / Destructors
// ------------------------------------------------------------------------
@@ -87,7 +92,7 @@ public class DefaultMemoryManager implements MemoryManager {
* @param numberOfSlots The number of slots of the task manager.
*/
public DefaultMemoryManager(long memorySize, int numberOfSlots) {
- this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
+ this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE, true);
}
/**
@@ -96,8 +101,10 @@ public class DefaultMemoryManager implements MemoryManager {
* @param memorySize The total size of the memory to be managed by this memory manager.
* @param numberOfSlots The number of slots of the task manager.
* @param pageSize The size of the pages handed out by the memory manager.
+ * @param preAllocateMemory True, if the memory manaber should immediately allocate all memory, false
+ * if it should allocate and release the memory as needed.
*/
- public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {
+ public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize, boolean preAllocateMemory) {
// sanity checks
if (memorySize <= 0) {
throw new IllegalArgumentException("Size of total memory must be positive.");
@@ -132,11 +139,17 @@ public class DefaultMemoryManager implements MemoryManager {
this.freeSegments = new ArrayDeque<byte[]>(this.totalNumPages);
this.allocatedSegments = new HashMap<AbstractInvokable, Set<DefaultMemorySegment>>();
+ this.isPreAllocated = preAllocateMemory;
- // add the full chunks
- for (int i = 0; i < this.totalNumPages; i++) {
- // allocate memory of the specified size
- this.freeSegments.add(new byte[this.pageSize]);
+ if (preAllocateMemory) {
+ // add the full chunks
+ for (int i = 0; i < this.totalNumPages; i++) {
+ // allocate memory of the specified size
+ this.freeSegments.add(new byte[this.pageSize]);
+ }
+ }
+ else {
+ this.numNonAllocatedPages = this.totalNumPages;
}
}
@@ -147,12 +160,14 @@ public class DefaultMemoryManager implements MemoryManager {
{
if (!this.isShutDown) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Shutting down MemoryManager instance " + toString());
+ LOG.debug("Shutting down MemoryManager instance " + this);
}
// mark as shutdown and release memory
this.isShutDown = true;
+
this.freeSegments.clear();
+ this.numNonAllocatedPages = 0;
// go over all allocated segments and release them
for (Set<DefaultMemorySegment> segments : this.allocatedSegments.values()) {
@@ -173,7 +188,9 @@ public class DefaultMemoryManager implements MemoryManager {
@Override
public boolean verifyEmpty() {
synchronized (this.lock) {
- return this.freeSegments.size() == this.totalNumPages;
+ return isPreAllocated ?
+ this.freeSegments.size() == this.totalNumPages :
+ this.numNonAllocatedPages == this.totalNumPages;
}
}
@@ -209,7 +226,9 @@ public class DefaultMemoryManager implements MemoryManager {
throw new IllegalStateException("Memory manager has been shut down.");
}
- if (numPages > this.freeSegments.size()) {
+ // in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the
+ // lazy case, the 'freeSegments.size()' is zero.
+ if (numPages > (this.freeSegments.size() + numNonAllocatedPages)) {
throw new MemoryAllocationException("Could not allocate " + numPages + " pages. Only " +
this.freeSegments.size() + " pages are remaining.");
}
@@ -220,11 +239,22 @@ public class DefaultMemoryManager implements MemoryManager {
this.allocatedSegments.put(owner, segmentsForOwner);
}
- for (int i = numPages; i > 0; i--) {
- byte[] buffer = this.freeSegments.poll();
- final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
- target.add(segment);
- segmentsForOwner.add(segment);
+ if (isPreAllocated) {
+ for (int i = numPages; i > 0; i--) {
+ byte[] buffer = this.freeSegments.poll();
+ final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
+ target.add(segment);
+ segmentsForOwner.add(segment);
+ }
+ }
+ else {
+ for (int i = numPages; i > 0; i--) {
+ byte[] buffer = new byte[pageSize];
+ final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
+ target.add(segment);
+ segmentsForOwner.add(segment);
+ }
+ numNonAllocatedPages -= numPages;
}
}
// -------------------- END CRITICAL SECTION -------------------
@@ -259,14 +289,19 @@ public class DefaultMemoryManager implements MemoryManager {
this.allocatedSegments.remove(owner);
}
}
+
+ byte[] buffer = defSeg.destroy();
+
+ if (isPreAllocated) {
+ // release the memory in any case
+ this.freeSegments.add(buffer);
+ }
+ else {
+ numNonAllocatedPages++;
+ }
}
catch (Throwable t) {
- LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
- }
- finally {
- // release the memory in any case
- byte[] buffer = defSeg.destroy();
- this.freeSegments.add(buffer);
+ throw new RuntimeException("Error removing book-keeping reference to allocated memory segment.", t);
}
}
// -------------------- END CRITICAL SECTION -------------------
@@ -286,8 +321,7 @@ public class DefaultMemoryManager implements MemoryManager {
}
// since concurrent modifications to the collection
- // can disturb the release, we need to try potentially
- // multiple times
+ // can disturb the release, we need to try potentially multiple times
boolean successfullyReleased = false;
do {
final Iterator<T> segmentsIterator = segments.iterator();
@@ -322,12 +356,20 @@ public class DefaultMemoryManager implements MemoryManager {
this.allocatedSegments.remove(owner);
}
}
- } catch (Throwable t) {
- LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
- } finally {
+
// release the memory in any case
byte[] buffer = defSeg.destroy();
- this.freeSegments.add(buffer);
+
+ if (isPreAllocated) {
+ this.freeSegments.add(buffer);
+ }
+ else {
+ numNonAllocatedPages++;
+ }
+ }
+ catch (Throwable t) {
+ throw new RuntimeException(
+ "Error removing book-keeping reference to allocated memory segment.", t);
}
}
@@ -363,9 +405,17 @@ public class DefaultMemoryManager implements MemoryManager {
}
// free each segment
- for (DefaultMemorySegment seg : segments) {
- final byte[] buffer = seg.destroy();
- this.freeSegments.add(buffer);
+ if (isPreAllocated) {
+ for (DefaultMemorySegment seg : segments) {
+ final byte[] buffer = seg.destroy();
+ this.freeSegments.add(buffer);
+ }
+ }
+ else {
+ for (DefaultMemorySegment seg : segments) {
+ seg.destroy();
+ }
+ numNonAllocatedPages += segments.size();
}
segments.clear();
@@ -387,12 +437,16 @@ public class DefaultMemoryManager implements MemoryManager {
@Override
public int computeNumberOfPages(double fraction) {
- return getRelativeNumPages(fraction);
+ if (fraction <= 0 || fraction > 1) {
+ throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
+ }
+
+ return (int)(this.totalNumPages * fraction / this.numberOfSlots);
}
@Override
public long computeMemorySize(double fraction) {
- return this.pageSize*computeNumberOfPages(fraction);
+ return this.pageSize * computeNumberOfPages(fraction);
}
@Override
@@ -414,14 +468,6 @@ public class DefaultMemoryManager implements MemoryManager {
throw new IllegalArgumentException("The given number of bytes corresponds to more than MAX_INT pages.");
}
}
-
- private int getRelativeNumPages(double fraction){
- if (fraction <= 0 || fraction > 1) {
- throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
- }
-
- return (int)(this.totalNumPages * fraction / this.numberOfSlots);
- }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 65ce7dc..a95b5cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1396,8 +1396,10 @@ object TaskManager {
val memoryManager = try {
new DefaultMemoryManager(memorySize,
taskManagerConfig.numberOfSlots,
- netConfig.networkBufferSize)
- } catch {
+ netConfig.networkBufferSize,
+ true)
+ }
+ catch {
case e: OutOfMemoryError => throw new Exception(
"OutOfMemory error (" + e.getMessage + ") while allocating the TaskManager memory (" +
memorySize + " bytes).", e)
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 0462b3f..f2a5e2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -78,7 +78,7 @@ public class ChannelViewsTest
@Before
public void beforeTest() {
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, true);
this.ioManager = new IOManagerAsync();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 85d2113..dcc1e5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -66,7 +66,7 @@ public class FileChannelStreamsITCase {
@Before
public void beforeTest() {
- memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE);
+ memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE, true);
ioManager = new IOManagerAsync();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1f6899d..f9b8b38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -44,7 +44,7 @@ public class FileChannelStreamsTest {
public void testCloseAndDeleteOutputView() {
final IOManager ioManager = new IOManagerAsync();
try {
- MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+ MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -78,7 +78,7 @@ public class FileChannelStreamsTest {
public void testCloseAndDeleteInputView() {
final IOManager ioManager = new IOManagerAsync();
try {
- MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+ MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index f090ef1..c071bef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -45,7 +45,7 @@ public class SeekableFileChannelInputViewTest {
// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
try {
- MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE);
+ MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 9abedb3..a0a8356 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
+import org.apache.flink.runtime.memory.MemoryManagerTest;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
/**
@@ -82,7 +82,7 @@ public class IOManagerITCase {
@SuppressWarnings("unchecked")
public void parallelChannelsTest() throws Exception {
final Random rnd = new Random(SEED);
- final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
+ final AbstractInvokable memOwner = new MemoryManagerTest.DummyInvokable();
FileIOChannel.ID[] ids = new FileIOChannel.ID[NUM_CHANNELS];
BlockChannelWriter<MemorySegment>[] writers = new BlockChannelWriter[NUM_CHANNELS];
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index c9ca9fb..c71e181 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -40,7 +40,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
+import org.apache.flink.runtime.memory.MemoryManagerTest;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.junit.After;
import org.junit.Before;
@@ -62,7 +62,7 @@ public class IOManagerPerformanceBenchmark {
private static final int NUM_INTS_WRITTEN = 100000000;
- private static final AbstractInvokable memoryOwner = new DefaultMemoryManagerTest.DummyInvokable();
+ private static final AbstractInvokable memoryOwner = new MemoryManagerTest.DummyInvokable();
private DefaultMemoryManager memManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
deleted file mode 100644
index afd40a2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.memory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DefaultMemoryManagerTest
-{
- private static final long RANDOM_SEED = 643196033469871L;
-
- private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
-
- private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-
- private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
-
- private DefaultMemoryManager memoryManager;
-
- private Random random;
-
- @Before
- public void setUp()
- {
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
- this.random = new Random(RANDOM_SEED);
- }
-
- @After
- public void tearDown()
- {
- if (!this.memoryManager.verifyEmpty()) {
- Assert.fail("Memory manager is not complete empty and valid at the end of the test.");
- }
- this.memoryManager = null;
- this.random = null;
- }
-
- @Test
- public void allocateAllSingle() throws Exception
- {
- final AbstractInvokable mockInvoke = new DummyInvokable();
- List<MemorySegment> segments = new ArrayList<MemorySegment>();
-
- try {
- for (int i = 0; i < NUM_PAGES; i++) {
- segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
- }
- } catch (MemoryAllocationException e) {
- Assert.fail("Unable to allocate memory");
- }
-
- this.memoryManager.release(segments);
- }
-
- @Test
- public void allocateAllMulti() throws Exception
- {
- final AbstractInvokable mockInvoke = new DummyInvokable();
- final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-
- try {
- for(int i = 0; i < NUM_PAGES / 2; i++) {
- segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
- }
- } catch (MemoryAllocationException e) {
- Assert.fail("Unable to allocate memory");
- }
-
- this.memoryManager.release(segments);
- }
-
- @Test
- public void allocateMultipleOwners()
- {
- final int NUM_OWNERS = 17;
-
- try {
- AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
- @SuppressWarnings("unchecked")
- List<MemorySegment>[] mems = new List[NUM_OWNERS];
-
- for (int i = 0; i < NUM_OWNERS; i++) {
- owners[i] = new DummyInvokable();
- mems[i] = new ArrayList<MemorySegment>(64);
- }
-
- // allocate all memory to the different owners
- for (int i = 0; i < NUM_PAGES; i++) {
- final int owner = this.random.nextInt(NUM_OWNERS);
- mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
- }
-
- // free one owner at a time
- for (int i = 0; i < NUM_OWNERS; i++) {
- this.memoryManager.releaseAll(owners[i]);
- owners[i] = null;
- Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
- mems[i] = null;
-
- // check that the owner owners were not affected
- for (int k = i+1; k < NUM_OWNERS; k++) {
- Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test encountered an exception: " + e.getMessage());
- }
- }
-
- @Test
- public void allocateTooMuch()
- {
- try {
- final AbstractInvokable mockInvoke = new DummyInvokable();
-
- List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-
- try {
- this.memoryManager.allocatePages(mockInvoke, 1);
- Assert.fail("Expected MemoryAllocationException.");
- } catch (MemoryAllocationException maex) {
- // expected
- }
-
- Assert.assertTrue("The previously allocated segments were not valid any more.",
- allMemorySegmentsValid(segs));
-
- this.memoryManager.releaseAll(mockInvoke);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test encountered an exception: " + e.getMessage());
- }
- }
-
- private boolean allMemorySegmentsValid(List<MemorySegment> memSegs)
- {
- for (MemorySegment seg : memSegs) {
- if (seg.isFreed()) {
- return false;
- }
- }
- return true;
- }
-
- private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs)
- {
- for (MemorySegment seg : memSegs) {
- if (!seg.isFreed()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Utility class to serve as owner for the memory.
- */
- public static final class DummyInvokable extends AbstractInvokable {
- @Override
- public void registerInputOutput() {}
-
- @Override
- public void invoke() throws Exception {}
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
new file mode 100644
index 0000000..e077d08
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the memory manager, in the mode where it pre-allocates all memory.
+ */
+public class MemoryManagerLazyAllocationTest {
+
+ private static final long RANDOM_SEED = 643196033469871L;
+
+ private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
+
+ private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
+
+ private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
+
+ private DefaultMemoryManager memoryManager;
+
+ private Random random;
+
+
+ @Before
+ public void setUp() {
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, false);
+ this.random = new Random(RANDOM_SEED);
+ }
+
+ @After
+ public void tearDown() {
+ if (!this.memoryManager.verifyEmpty()) {
+ fail("Memory manager is not complete empty and valid at the end of the test.");
+ }
+ this.memoryManager = null;
+ this.random = null;
+ }
+
+ @Test
+ public void allocateAllSingle() {
+ try {
+ final AbstractInvokable mockInvoke = new DummyInvokable();
+ List<MemorySegment> segments = new ArrayList<MemorySegment>();
+
+ try {
+ for (int i = 0; i < NUM_PAGES; i++) {
+ segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
+ }
+ }
+ catch (MemoryAllocationException e) {
+ fail("Unable to allocate memory");
+ }
+
+ for (MemorySegment seg : segments) {
+ this.memoryManager.release(seg);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allocateAllMulti() {
+ try {
+ final AbstractInvokable mockInvoke = new DummyInvokable();
+ final List<MemorySegment> segments = new ArrayList<MemorySegment>();
+
+ try {
+ for(int i = 0; i < NUM_PAGES / 2; i++) {
+ segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
+ }
+ } catch (MemoryAllocationException e) {
+ Assert.fail("Unable to allocate memory");
+ }
+
+ this.memoryManager.release(segments);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allocateMultipleOwners() {
+ final int NUM_OWNERS = 17;
+
+ try {
+ AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
+
+ @SuppressWarnings("unchecked")
+ List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
+
+ for (int i = 0; i < NUM_OWNERS; i++) {
+ owners[i] = new DummyInvokable();
+ mems[i] = new ArrayList<MemorySegment>(64);
+ }
+
+ // allocate all memory to the different owners
+ for (int i = 0; i < NUM_PAGES; i++) {
+ final int owner = this.random.nextInt(NUM_OWNERS);
+ mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
+ }
+
+ // free one owner at a time
+ for (int i = 0; i < NUM_OWNERS; i++) {
+ this.memoryManager.releaseAll(owners[i]);
+ owners[i] = null;
+ Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
+ mems[i] = null;
+
+ // check that the owner owners were not affected
+ for (int k = i+1; k < NUM_OWNERS; k++) {
+ Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allocateTooMuch() {
+ try {
+ final AbstractInvokable mockInvoke = new DummyInvokable();
+
+ List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
+
+ try {
+ this.memoryManager.allocatePages(mockInvoke, 1);
+ Assert.fail("Expected MemoryAllocationException.");
+ } catch (MemoryAllocationException maex) {
+ // expected
+ }
+
+ Assert.assertTrue("The previously allocated segments were not valid any more.",
+ allMemorySegmentsValid(segs));
+
+ this.memoryManager.releaseAll(mockInvoke);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
+ for (MemorySegment seg : memSegs) {
+ if (seg.isFreed()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
+ for (MemorySegment seg : memSegs) {
+ if (!seg.isFreed()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Utility class to serve as owner for the memory.
+ */
+ public static final class DummyInvokable extends AbstractInvokable {
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() throws Exception {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
new file mode 100644
index 0000000..c211357
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the memory manager, in the mode where it pre-allocates all memory.
+ */
+public class MemoryManagerTest {
+
+ private static final long RANDOM_SEED = 643196033469871L;
+
+ private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
+
+ private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
+
+ private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
+
+ private DefaultMemoryManager memoryManager;
+
+ private Random random;
+
+
+ @Before
+ public void setUp() {
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
+ this.random = new Random(RANDOM_SEED);
+ }
+
+ @After
+ public void tearDown() {
+ if (!this.memoryManager.verifyEmpty()) {
+ fail("Memory manager is not complete empty and valid at the end of the test.");
+ }
+ this.memoryManager = null;
+ this.random = null;
+ }
+
+ @Test
+ public void allocateAllSingle() {
+ try {
+ final AbstractInvokable mockInvoke = new DummyInvokable();
+ List<MemorySegment> segments = new ArrayList<MemorySegment>();
+
+ try {
+ for (int i = 0; i < NUM_PAGES; i++) {
+ segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
+ }
+ }
+ catch (MemoryAllocationException e) {
+ fail("Unable to allocate memory");
+ }
+
+ this.memoryManager.release(segments);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allocateAllMulti() {
+ try {
+ final AbstractInvokable mockInvoke = new DummyInvokable();
+ final List<MemorySegment> segments = new ArrayList<MemorySegment>();
+
+ try {
+ for(int i = 0; i < NUM_PAGES / 2; i++) {
+ segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
+ }
+ } catch (MemoryAllocationException e) {
+ Assert.fail("Unable to allocate memory");
+ }
+
+ this.memoryManager.release(segments);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allocateMultipleOwners() {
+ final int NUM_OWNERS = 17;
+
+ try {
+ AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
+
+ @SuppressWarnings("unchecked")
+ List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
+
+ for (int i = 0; i < NUM_OWNERS; i++) {
+ owners[i] = new DummyInvokable();
+ mems[i] = new ArrayList<MemorySegment>(64);
+ }
+
+ // allocate all memory to the different owners
+ for (int i = 0; i < NUM_PAGES; i++) {
+ final int owner = this.random.nextInt(NUM_OWNERS);
+ mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
+ }
+
+ // free one owner at a time
+ for (int i = 0; i < NUM_OWNERS; i++) {
+ this.memoryManager.releaseAll(owners[i]);
+ owners[i] = null;
+ Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
+ mems[i] = null;
+
+ // check that the owner owners were not affected
+ for (int k = i+1; k < NUM_OWNERS; k++) {
+ Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allocateTooMuch() {
+ try {
+ final AbstractInvokable mockInvoke = new DummyInvokable();
+
+ List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
+
+ try {
+ this.memoryManager.allocatePages(mockInvoke, 1);
+ Assert.fail("Expected MemoryAllocationException.");
+ } catch (MemoryAllocationException maex) {
+ // expected
+ }
+
+ Assert.assertTrue("The previously allocated segments were not valid any more.",
+ allMemorySegmentsValid(segs));
+
+ this.memoryManager.releaseAll(mockInvoke);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
+ for (MemorySegment seg : memSegs) {
+ if (seg.isFreed()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
+ for (MemorySegment seg : memSegs) {
+ if (!seg.isFreed()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Utility class to serve as owner for the memory.
+ */
+ public static final class DummyInvokable extends AbstractInvokable {
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() throws Exception {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
index cefd0c5..f9adb94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
@@ -51,8 +51,8 @@ public class MemorySegmentTest {
@Before
public void setUp() throws Exception{
try {
- this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE);
- this.segment = manager.allocatePages(new DefaultMemoryManagerTest.DummyInvokable(), 1).get(0);
+ this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, true);
+ this.segment = manager.allocatePages(new MemoryManagerTest.DummyInvokable(), 1).get(0);
this.random = new Random(RANDOM_SEED);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 02bffec..b1466c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -69,7 +69,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
public TestTaskContext() {}
public TestTaskContext(long memoryInBytes) {
- this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024);
+ this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 5012d1e..f5105bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -101,10 +101,9 @@ public class NonReusingReOpenableHashTableITCase {
private TypeComparator<Record> recordProbeSideComparator;
private TypePairComparator<Record, Record> pactRecordComparator;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Before
- public void beforeTest()
- {
+ public void beforeTest() {
this.recordSerializer = RecordSerializer.get();
this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
@@ -121,13 +120,12 @@ public class NonReusingReOpenableHashTableITCase {
this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
this.ioManager = new IOManagerAsync();
}
@After
- public void afterTest()
- {
+ public void afterTest() {
if (this.ioManager != null) {
this.ioManager.shutdown();
if (!this.ioManager.isProperlyShutDown()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index fd2f906..7172887 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -100,10 +100,9 @@ public class ReusingReOpenableHashTableITCase {
private TypeComparator<Record> recordProbeSideComparator;
private TypePairComparator<Record, Record> pactRecordComparator;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Before
- public void beforeTest()
- {
+ public void beforeTest() {
this.recordSerializer = RecordSerializer.get();
this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
@@ -120,13 +119,12 @@ public class ReusingReOpenableHashTableITCase {
this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
this.ioManager = new IOManagerAsync();
}
@After
- public void afterTest()
- {
+ public void afterTest() {
if (this.ioManager != null) {
this.ioManager.shutdown();
if (!this.ioManager.isProperlyShutDown()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 498cb61..429062b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -62,7 +62,7 @@ public class LargeRecordHandlerITCase {
final int NUM_RECORDS = 10;
try {
- final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+ final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
final AbstractInvokable owner = new DummyInvokable();
final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -203,7 +203,7 @@ public class LargeRecordHandlerITCase {
FileIOChannel.ID channel = null;
try {
- final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+ final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
final AbstractInvokable owner = new DummyInvokable();
final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index 6eb736f..423ff9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -49,7 +49,7 @@ public class LargeRecordHandlerTest {
final int NUM_PAGES = 50;
try {
- final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+ final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
final AbstractInvokable owner = new DummyInvokable();
final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
@@ -101,7 +101,7 @@ public class LargeRecordHandlerTest {
final int NUM_RECORDS = 25000;
try {
- final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+ final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
final AbstractInvokable owner = new DummyInvokable();
final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -189,7 +189,7 @@ public class LargeRecordHandlerTest {
final int NUM_RECORDS = 25000;
try {
- final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+ final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
final AbstractInvokable owner = new DummyInvokable();
final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index dca3c58..7d4994d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -85,7 +85,7 @@ public class TaskManagerComponentsStartupShutdownTest {
final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
- final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE);
+ final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false);
final IOManager ioManager = new IOManagerAsync(TMP_DIR);
final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
final int numberOfSlots = 1;