You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:16 UTC
[52/63] [abbrv] git commit: Adjust tests to new JobGraphModel
Adjust tests to new JobGraphModel
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/caa4ebef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/caa4ebef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/caa4ebef
Branch: refs/heads/master
Commit: caa4ebef82d84b5b4ffb945f90c06e0e82b9a102
Parents: e56d883
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 16:54:38 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:50 2014 +0200
----------------------------------------------------------------------
.../client/program/PackagedProgramTest.java | 1 -
.../runtime/jobgraph/AbstractJobVertex.java | 6 +-
.../runtime/jobgraph/JobManagerTestUtils.java | 89 +++++++++
.../jobmanager/CoLocationConstraintITCase.java | 111 +++++++++++
.../runtime/jobmanager/JobManagerITCase.java | 148 +--------------
.../runtime/jobmanager/SlotSharingITCase.java | 186 +++++++++++++++++++
.../tasks/AgnosticBinaryReceiver.java | 41 ++++
.../jobmanager/tasks/AgnosticReceiver.java | 38 ++++
.../runtime/jobmanager/tasks/Receiver.java | 44 +++++
.../flink/runtime/jobmanager/tasks/Sender.java | 46 +++++
.../BroadcastVarsNepheleITCase.java | 26 +--
.../KMeansIterativeNepheleITCase.java | 29 +--
.../test/cancelling/CancellingTestBase.java | 1 -
.../test/iterative/nephele/JobGraphUtils.java | 48 ++---
.../test/runtime/NetworkStackThroughput.java | 54 +++---
flink-tests/src/test/resources/logback-test.xml | 4 +-
16 files changed, 655 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 372c65b..4adfdb8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -31,7 +31,6 @@ public class PackagedProgramTest {
@Test
public void testGetPreviewPlan() {
try {
-
PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
Assert.assertNotNull(prog.getPreviewPlan());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 82823b2..d2462ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -62,11 +62,13 @@ public class AbstractJobVertex implements java.io.Serializable {
/** Optionally, a source of input splits */
private InputSplitSource<?> inputSplitSource;
+ /** The name of the vertex */
+ private String name;
+
/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
private SlotSharingGroup slotSharingGroup;
- /** The name of the vertex */
- private String name;
+// private AbstractJobVertex coLocatedWith
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
new file mode 100644
index 0000000..14a73e1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+public class JobManagerTestUtils {
+
+ public static final JobManager startJobManager(int numSlots) throws Exception {
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+
+ GlobalConfiguration.includeConfiguration(cfg);
+
+ JobManager jm = new JobManager(ExecutionMode.LOCAL);
+
+ // we need to wait until the taskmanager is registered
+ // max time is 5 seconds
+ long deadline = System.currentTimeMillis() + 5000;
+
+ while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+
+ assertEquals(numSlots, jm.getAvailableSlots());
+
+ return jm;
+ }
+
+ public static int getAvailablePort() throws IOException {
+ for (int i = 0; i < 50; i++) {
+ ServerSocket serverSocket = null;
+ try {
+ serverSocket = new ServerSocket(0);
+ int port = serverSocket.getLocalPort();
+ if (port != 0) {
+ return port;
+ }
+ } finally {
+ serverSocket.close();
+ }
+ }
+
+ throw new IOException("could not find free port");
+ }
+
+ public static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
+ Thread[] threads = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+
+ for (Thread t : threads) {
+ if (t == null) {
+ continue;
+ }
+ ThreadGroup tg = t.getThreadGroup();
+ if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
+ t.join();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
new file mode 100644
index 0000000..9bda8ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+public class CoLocationConstraintITCase {
+
+
+ /**
+ * This job runs in N slots with N senders and N receivers. Unless slot sharing is used, it cannot complete.
+ */
+ @Test
+ public void testForwardJob() {
+
+ final int NUM_TASKS = 31;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index f4d74a3..44d1c11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -18,17 +18,12 @@
package org.apache.flink.runtime.jobmanager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -42,8 +37,12 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.tasks.AgnosticBinaryReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.AgnosticReceiver;
import org.apache.flink.runtime.jobmanager.tasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.jobmanager.tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.Test;
@@ -837,144 +836,9 @@ public class JobManagerITCase {
}
// --------------------------------------------------------------------------------------------
-
- private static final JobManager startJobManager(int numSlots) throws Exception {
- Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
-
- GlobalConfiguration.includeConfiguration(cfg);
-
- JobManager jm = new JobManager(ExecutionMode.LOCAL);
-
- // we need to wait until the taskmanager is registered
- // max time is 5 seconds
- long deadline = System.currentTimeMillis() + 5000;
-
- while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
- Thread.sleep(10);
- }
-
- assertEquals(numSlots, jm.getAvailableSlots());
-
- return jm;
- }
-
- private static int getAvailablePort() throws IOException {
- for (int i = 0; i < 50; i++) {
- ServerSocket serverSocket = null;
- try {
- serverSocket = new ServerSocket(0);
- int port = serverSocket.getLocalPort();
- if (port != 0) {
- return port;
- }
- } finally {
- serverSocket.close();
- }
- }
-
- throw new IOException("could not find free port");
- }
-
- private static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
- Thread[] threads = new Thread[Thread.activeCount()];
- Thread.enumerate(threads);
-
- for (Thread t : threads) {
- if (t == null) {
- continue;
- }
- ThreadGroup tg = t.getThreadGroup();
- if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
- t.join();
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
// Simple test tasks
// --------------------------------------------------------------------------------------------
- public static final class Sender extends AbstractInvokable {
-
- private RecordWriter<IntegerRecord> writer;
-
- @Override
- public void registerInputOutput() {
- writer = new RecordWriter<IntegerRecord>(this);
- }
-
- @Override
- public void invoke() throws Exception {
- try {
- writer.initializeSerializers();
- writer.emit(new IntegerRecord(42));
- writer.emit(new IntegerRecord(1337));
- writer.flush();
- }
- finally {
- writer.clearBuffers();
- }
- }
- }
-
- public static final class Receiver extends AbstractInvokable {
-
- private RecordReader<IntegerRecord> reader;
-
- @Override
- public void registerInputOutput() {
- reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
- IntegerRecord i1 = reader.next();
- IntegerRecord i2 = reader.next();
- IntegerRecord i3 = reader.next();
-
- if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
- throw new Exception("Wrong Data Received");
- }
- }
- }
-
- public static final class AgnosticReceiver extends AbstractInvokable {
-
- private RecordReader<IntegerRecord> reader;
-
- @Override
- public void registerInputOutput() {
- reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
- while (reader.next() != null);
- }
- }
-
- public static final class AgnosticBinaryReceiver extends AbstractInvokable {
-
- private RecordReader<IntegerRecord> reader1;
- private RecordReader<IntegerRecord> reader2;
-
- @Override
- public void registerInputOutput() {
- reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
- reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
- while (reader1.next() != null);
- while (reader2.next() != null);
- }
- }
-
public static final class ExceptionSender extends AbstractInvokable {
private RecordWriter<IntegerRecord> writer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
new file mode 100644
index 0000000..98abc8d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.AgnosticBinaryReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+public class SlotSharingITCase {
+
+
+ /**
+ * This job runs in N slots with N senders and N receivers. Unless slot sharing is used, it cannot complete.
+ */
+ @Test
+ public void testForwardJob() {
+
+ final int NUM_TASKS = 31;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used, it cannot complete.
+ */
+ @Test
+ public void testTwoInputJob() {
+
+ final int NUM_TASKS = 11;
+
+ try {
+ final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
+ final AbstractJobVertex sender2 = new AbstractJobVertex("Sender2");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender1.setInvokableClass(Sender.class);
+ sender2.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(AgnosticBinaryReceiver.class);
+
+ sender1.setParallelism(NUM_TASKS);
+ sender2.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup(sender1.getID(), sender2.getID(), receiver.getID());
+ sender1.setSlotSharingGroup(sharingGroup);
+ sender2.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);;
+
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
+
+ final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
+
+ JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java
new file mode 100644
index 0000000..3784205
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class AgnosticBinaryReceiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader1;
+ private RecordReader<IntegerRecord> reader2;
+
+ @Override
+ public void registerInputOutput() {
+ reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ while (reader1.next() != null);
+ while (reader2.next() != null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java
new file mode 100644
index 0000000..ce38b46
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class AgnosticReceiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ while (reader.next() != null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java
new file mode 100644
index 0000000..298673a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class Receiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ IntegerRecord i1 = reader.next();
+ IntegerRecord i2 = reader.next();
+ IntegerRecord i3 = reader.next();
+
+ if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+ throw new Exception("Wrong Data Received");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java
new file mode 100644
index 0000000..340465b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class Sender extends AbstractInvokable {
+
+ private RecordWriter<IntegerRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ writer = new RecordWriter<IntegerRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ try {
+ writer.initializeSerializers();
+ writer.emit(new IntegerRecord(42));
+ writer.emit(new IntegerRecord(1337));
+ writer.flush();
+ }
+ finally {
+ writer.clearBuffers();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index 33112af..947a448 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -34,12 +34,12 @@ import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactor
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.network.channels.ChannelType;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.RegularPactTask;
@@ -254,8 +254,8 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
return modelsInput;
}
- private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
- JobTaskVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]", jobGraph, numSubTasks);
+ private static AbstractJobVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ AbstractJobVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]", jobGraph, numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -300,7 +300,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
// Unified solution set and workset tail update
// -------------------------------------------------------------------------------------------------------------
- private JobGraph createJobGraphV1(String pointsPath, String centersPath, String resultPath, int numSubTasks) throws JobGraphDefinitionException {
+ private JobGraph createJobGraphV1(String pointsPath, String centersPath, String resultPath, int numSubTasks) {
// -- init -------------------------------------------------------------------------------------------------
final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
@@ -310,7 +310,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
// -- vertices ---------------------------------------------------------------------------------------------
InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
InputFormatVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
- JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
+ AbstractJobVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
// -- edges ------------------------------------------------------------------------------------------------
@@ -319,9 +319,13 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(mapper, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
// -- instance sharing -------------------------------------------------------------------------------------
- points.setVertexToShareInstancesWith(output);
- models.setVertexToShareInstancesWith(output);
- mapper.setVertexToShareInstancesWith(output);
+
+ SlotSharingGroup sharing = new SlotSharingGroup();
+
+ points.setSlotSharingGroup(sharing);
+ models.setSlotSharingGroup(sharing);
+ mapper.setSlotSharingGroup(sharing);
+ output.setSlotSharingGroup(sharing);
return jobGraph;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 678a7e5..a31539f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -31,10 +31,11 @@ import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -154,8 +155,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return output;
}
- private static JobTaskVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
- JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
+ private static AbstractJobVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -188,11 +189,11 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return head;
}
- private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
+ private static AbstractJobVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
TypeSerializerFactory<?> broadcastVarSerializer, TypeSerializerFactory<?> outputSerializer,
TypeComparatorFactory<?> outputComparator)
{
- JobTaskVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
+ AbstractJobVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
"Map (Select nearest center)", jobGraph, numSubTasks);
TaskConfig intermediateConfig = new TaskConfig(mapper.getConfiguration());
@@ -217,12 +218,12 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return mapper;
}
- private static JobTaskVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
+ private static AbstractJobVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
TypeComparatorFactory<?> inputComparator, TypeSerializerFactory<?> outputSerializer)
{
// ---------------- the tail (co group) --------------------
- JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph,
+ AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph,
numSubTasks);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
@@ -252,8 +253,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return tail;
}
- private static OutputFormatVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
- OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+ private static AbstractJobVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
+ AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, dop);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.setIterationId(ITERATION_ID);
@@ -264,7 +265,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
// Unified solution set and workset tail update
// -------------------------------------------------------------------------------------------------------------
- private static JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath, int numSubTasks, int numIterations) throws JobGraphDefinitionException {
+ private static JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath, int numSubTasks, int numIterations) {
// -- init -------------------------------------------------------------------------------------------------
final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
@@ -277,14 +278,14 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
InputFormatVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
- JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
- JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
+ AbstractJobVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
+ AbstractJobVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
- JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
+ AbstractJobVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
- OutputFormatVertex sync = createSync(jobGraph, numIterations, numSubTasks);
+ AbstractJobVertex sync = createSync(jobGraph, numIterations, numSubTasks);
OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index c512525..8bf74c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -196,7 +196,6 @@ public abstract class CancellingTestBase {
case CANCELED:
exitLoop = true;
break;
- case SCHEDULED: // okay
case RUNNING:
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 82bd046..2b4b779 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.operators.DataSinkTask;
import org.apache.flink.runtime.operators.DataSourceTask;
@@ -45,8 +44,8 @@ public class JobGraphUtils {
public static final long MEGABYTE = 1024l * 1024l;
- private JobGraphUtils() {
- }
+ private JobGraphUtils() {}
+
public static void submit(JobGraph graph, Configuration nepheleConfig) throws IOException, JobExecutionException {
JobClient client = new JobClient(graph, nepheleConfig, JobGraphUtils.class.getClassLoader());
@@ -63,10 +62,10 @@ public class JobGraphUtils {
private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
int degreeOfParallelism)
{
- InputFormatVertex inputVertex = new InputFormatVertex(graph, name);
+ InputFormatVertex inputVertex = new InputFormatVertex(name);
+ graph.addVertex(inputVertex);
inputVertex.setInvokableClass(DataSourceTask.class);
-
inputVertex.setParallelism(degreeOfParallelism);
TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
@@ -83,42 +82,49 @@ public class JobGraphUtils {
// }
public static void connect(AbstractJobVertex source, AbstractJobVertex target, ChannelType channelType,
- DistributionPattern distributionPattern) throws JobGraphDefinitionException
+ DistributionPattern distributionPattern)
{
- source.connectTo(target, channelType, distributionPattern);
+ target.connectNewDataSetAsInput(source, distributionPattern);
}
- public static JobTaskVertex createTask(@SuppressWarnings("rawtypes") Class<? extends RegularPactTask> task, String name, JobGraph graph,
- int degreeOfParallelism)
+ @SuppressWarnings("rawtypes")
+ public static AbstractJobVertex createTask(Class<? extends RegularPactTask> task, String name, JobGraph graph, int parallelism)
{
- JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
+ AbstractJobVertex taskVertex = new AbstractJobVertex(name);
+ graph.addVertex(taskVertex);
+
taskVertex.setInvokableClass(task);
- taskVertex.setNumberOfSubtasks(degreeOfParallelism);
+ taskVertex.setParallelism(parallelism);
return taskVertex;
}
- public static OutputFormatVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
- OutputFormatVertex sync = new OutputFormatVertex(jobGraph, "BulkIterationSync");
+ public static AbstractJobVertex createSync(JobGraph jobGraph, int parallelism) {
+ AbstractJobVertex sync = new AbstractJobVertex("BulkIterationSync");
+ jobGraph.addVertex(sync);
+
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setParallelism(1);
+
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);
+ syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, parallelism);
return sync;
}
- public static OutputFormatVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
- {
- OutputFormatVertex outputVertex = new OutputFormatVertex(jobGraph, name);
+ public static OutputFormatVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism) {
+ OutputFormatVertex outputVertex = new OutputFormatVertex(name);
+ jobGraph.addVertex(outputVertex);
+
outputVertex.setInvokableClass(FakeOutputTask.class);
outputVertex.setParallelism(degreeOfParallelism);
return outputVertex;
}
- public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
- {
- OutputFormatVertex sinkVertex = new OutputFormatVertex(jobGraph, name);
+ public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int parallelism) {
+ OutputFormatVertex sinkVertex = new OutputFormatVertex(name);
+ jobGraph.addVertex(sinkVertex);
+
sinkVertex.setInvokableClass(DataSinkTask.class);
- sinkVertex.setParallelism(degreeOfParallelism);
+ sinkVertex.setParallelism(parallelism);
return sinkVertex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index c365378..7c79e35 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -28,14 +28,11 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.network.api.RecordReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.SimpleInputVertex;
-import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.After;
@@ -95,38 +92,44 @@ public class NetworkStackThroughput {
}
private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
- boolean isSlowReceiver, int numSubtasks) throws JobGraphDefinitionException {
-
+ boolean isSlowReceiver, int numSubtasks)
+ {
JobGraph jobGraph = new JobGraph("Speed Test");
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
- SimpleInputVertex producer = new SimpleInputVertex("Speed Test Producer", jobGraph);
+ AbstractJobVertex producer = new AbstractJobVertex("Speed Test Producer");
+ jobGraph.addVertex(producer);
+ producer.setSlotSharingGroup(sharingGroup);
+
producer.setInvokableClass(SpeedTestProducer.class);
- producer.setNumberOfSubtasks(numSubtasks);
+ producer.setParallelism(numSubtasks);
producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
- JobTaskVertex forwarder = null;
+ AbstractJobVertex forwarder = null;
if (useForwarder) {
- forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+ forwarder = new AbstractJobVertex("Speed Test Forwarder");
+ jobGraph.addVertex(forwarder);
+ forwarder.setSlotSharingGroup(sharingGroup);
+
forwarder.setInvokableClass(SpeedTestForwarder.class);
- forwarder.setNumberOfSubtasks(numSubtasks);
+ forwarder.setParallelism(numSubtasks);
}
- SimpleOutputVertex consumer = new SimpleOutputVertex("Speed Test Consumer", jobGraph);
+ AbstractJobVertex consumer = new AbstractJobVertex("Speed Test Consumer");
+ jobGraph.addVertex(consumer);
+ consumer.setSlotSharingGroup(sharingGroup);
+
consumer.setInvokableClass(SpeedTestConsumer.class);
- consumer.setNumberOfSubtasks(numSubtasks);
+ consumer.setParallelism(numSubtasks);
consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
if (useForwarder) {
- producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
- forwarder.setVertexToShareInstancesWith(producer);
- consumer.setVertexToShareInstancesWith(producer);
+ forwarder.connectNewDataSetAsInput(producer, DistributionPattern.BIPARTITE);
+ consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.BIPARTITE);
}
else {
- producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- producer.setVertexToShareInstancesWith(consumer);
+ consumer.connectNewDataSetAsInput(producer, DistributionPattern.BIPARTITE);
}
return jobGraph;
@@ -285,9 +288,12 @@ public class NetworkStackThroughput {
TestBaseWrapper test = new TestBaseWrapper(config);
test.startCluster();
- test.testJob();
- test.calculateThroughput();
- test.stopCluster();
+ try {
+ test.testJob();
+ test.calculateThroughput();
+ } finally {
+ test.stopCluster();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/logback-test.xml b/flink-tests/src/test/resources/logback-test.xml
index 7c47e0b..ec37329 100644
--- a/flink-tests/src/test/resources/logback-test.xml
+++ b/flink-tests/src/test/resources/logback-test.xml
@@ -23,12 +23,14 @@
</encoder>
</appender>
- <root level="WARN">
+ <root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
+<!--
<logger name="org.apache.flink.test.recordJobs.relational.query1Util.LineItemFilter" level="ERROR"/>
<logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
+ -->
</configuration>
\ No newline at end of file