You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:26 UTC
[02/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 8b891a9..78c9288 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.util;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -30,7 +29,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
new file mode 100644
index 0000000..3f87040
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+final class OneShotLatch {
+
+ private final Object lock = new Object();
+
+ private boolean triggered;
+
+ public void trigger() {
+ synchronized (lock) {
+ triggered = true;
+ lock.notifyAll();
+ }
+ }
+
+ public void await() throws InterruptedException {
+ synchronized (lock) {
+ while (!triggered) {
+ lock.wait();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
new file mode 100644
index 0000000..91dc5b3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+public class TaskExecutionStateTest {
+
+ @Test
+ public void testEqualsHashCode() {
+ try {
+ final JobID jid = new JobID();
+ final ExecutionAttemptID executionId = new ExecutionAttemptID();
+ final ExecutionState2 state = ExecutionState2.RUNNING;
+ final String description = "some test description";
+
+ TaskExecutionState s1 = new TaskExecutionState(jid, executionId, state, description);
+ TaskExecutionState s2 = new TaskExecutionState(jid, executionId, state, description);
+
+ assertEquals(s1.hashCode(), s2.hashCode());
+ assertEquals(s1, s2);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialization() {
+ try {
+ final JobID jid = new JobID();
+ final ExecutionAttemptID executionId = new ExecutionAttemptID();
+ final ExecutionState2 state = ExecutionState2.DEPLOYING;
+ final String description = "foo bar";
+
+ TaskExecutionState original = new TaskExecutionState(jid, executionId, state, description);
+
+ TaskExecutionState writableCopy = CommonTestUtils.createCopyWritable(original);
+ TaskExecutionState javaSerCopy = CommonTestUtils.createCopySerializable(original);
+
+ assertEquals(original, writableCopy);
+ assertEquals(original, javaSerCopy);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
new file mode 100644
index 0000000..e7639e3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.channels.ChannelID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.util.LogUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+
+public class TaskManagerTest {
+
+ @BeforeClass
+ public static void reduceLogLevel() {
+ LogUtils.initializeDefaultTestConsoleLogger();
+ }
+
+ @Test
+ public void testSetupTaskManager() {
+ try {
+ JobManager jobManager = getJobManagerMockBase();
+
+ TaskManager tm = createTaskManager(jobManager);
+
+ JobID jid = new JobID();
+ JobVertexID vid = new JobVertexID();
+ ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
+ new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ LibraryCacheManager.register(jid, new String[0]);
+
+ TaskOperationResult result = tm.submitTask(tdd);
+ assertTrue(result.isSuccess());
+ assertEquals(eid, result.getExecutionId());
+ assertEquals(vid, result.getVertexId());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJobSubmissionAndCanceling() {
+ try {
+ JobManager jobManager = getJobManagerMockBase();
+
+ TaskManager tm = createTaskManager(jobManager);
+
+ JobID jid1 = new JobID();
+ JobID jid2 = new JobID();
+
+ JobVertexID vid1 = new JobVertexID();
+ JobVertexID vid2 = new JobVertexID();
+
+ ExecutionAttemptID eid1 = new ExecutionAttemptID();
+ ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+ TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
+ new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
+ new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ LibraryCacheManager.register(jid1, new String[0]);
+ LibraryCacheManager.register(jid2, new String[0]);
+ assertNotNull(LibraryCacheManager.getClassLoader(jid1));
+ assertNotNull(LibraryCacheManager.getClassLoader(jid2));
+
+ TaskOperationResult result1 = tm.submitTask(tdd1);
+ TaskOperationResult result2 = tm.submitTask(tdd2);
+
+ assertTrue(result1.isSuccess());
+ assertTrue(result2.isSuccess());
+ assertEquals(eid1, result1.getExecutionId());
+ assertEquals(eid2, result2.getExecutionId());
+ assertEquals(vid1, result1.getVertexId());
+ assertEquals(vid2, result2.getVertexId());
+
+ Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+ assertEquals(2, tasks.size());
+
+ Task t1 = tasks.get(eid1);
+ Task t2 = tasks.get(eid2);
+ assertNotNull(t1);
+ assertNotNull(t2);
+
+ assertEquals(ExecutionState2.RUNNING, t1.getExecutionState());
+ assertEquals(ExecutionState2.RUNNING, t2.getExecutionState());
+
+ // cancel one task
+ assertTrue(tm.cancelTask(vid1, 1, eid1).isSuccess());
+ t1.getEnvironment().getExecutingThread().join();
+ assertEquals(ExecutionState2.CANCELED, t1.getExecutionState());
+
+ tasks = tm.getAllRunningTasks();
+ assertEquals(1, tasks.size());
+
+ // try to cancel a non existing task
+ assertFalse(tm.cancelTask(vid1, 1, eid1).isSuccess());
+
+ // cancel the second task
+ assertTrue(tm.cancelTask(vid2, 2, eid2).isSuccess());
+ t2.getEnvironment().getExecutingThread().join();
+ assertEquals(ExecutionState2.CANCELED, t2.getExecutionState());
+
+ tasks = tm.getAllRunningTasks();
+ assertEquals(0, tasks.size());
+
+ // the class loaders should be de-registered
+ assertNull(LibraryCacheManager.getClassLoader(jid1));
+ assertNull(LibraryCacheManager.getClassLoader(jid2));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGateChannelEdgeMismatch() {
+ try {
+ JobManager jobManager = getJobManagerMockBase();
+
+ TaskManager tm = createTaskManager(jobManager);
+
+ JobID jid = new JobID();;
+
+ JobVertexID vid1 = new JobVertexID();
+ JobVertexID vid2 = new JobVertexID();
+
+ ExecutionAttemptID eid1 = new ExecutionAttemptID();
+ ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+ TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+ new Configuration(), new Configuration(), Sender.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+ new Configuration(), new Configuration(), Receiver.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ LibraryCacheManager.register(jid, new String[0]);
+ LibraryCacheManager.register(jid, new String[0]);
+ assertNotNull(LibraryCacheManager.getClassLoader(jid));
+
+ assertFalse(tm.submitTask(tdd1).isSuccess());
+ assertFalse(tm.submitTask(tdd2).isSuccess());
+
+ Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+ assertEquals(0, tasks.size());
+
+ // the class loaders should be de-registered
+ assertNull(LibraryCacheManager.getClassLoader(jid));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRunJobWithForwardChannel() {
+ try {
+ JobID jid = new JobID();
+
+ JobVertexID vid1 = new JobVertexID();
+ JobVertexID vid2 = new JobVertexID();
+
+ ExecutionAttemptID eid1 = new ExecutionAttemptID();
+ ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+ ChannelID senderId = new ChannelID();
+ ChannelID receiverId = new ChannelID();
+
+ JobManager jobManager = getJobManagerMockBase();
+ when(jobManager.lookupConnectionInfo(Matchers.any(InstanceConnectionInfo.class), Matchers.eq(jid), Matchers.eq(senderId)))
+ .thenReturn(ConnectionInfoLookupResponse.createReceiverFoundAndReady(receiverId));
+
+ TaskManager tm = createTaskManager(jobManager);
+
+ ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(senderId, receiverId);
+
+ TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+ new Configuration(), new Configuration(), Sender.class.getName(),
+ Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+ new Configuration(), new Configuration(), Receiver.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
+ new String[0], 0);
+
+ // register the job twice (for two tasks) at the lib cache
+ LibraryCacheManager.register(jid, new String[0]);
+ LibraryCacheManager.register(jid, new String[0]);
+ assertNotNull(LibraryCacheManager.getClassLoader(jid));
+
+ // deploy sender before receiver, so the target is online when the sender requests the connection info
+ TaskOperationResult result2 = tm.submitTask(tdd2);
+ TaskOperationResult result1 = tm.submitTask(tdd1);
+
+ assertTrue(result1.isSuccess());
+ assertTrue(result2.isSuccess());
+ assertEquals(eid1, result1.getExecutionId());
+ assertEquals(eid2, result2.getExecutionId());
+ assertEquals(vid1, result1.getVertexId());
+ assertEquals(vid2, result2.getVertexId());
+
+ Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+
+ Task t1 = tasks.get(eid1);
+ Task t2 = tasks.get(eid2);
+
+ // wait until the tasks are done
+ if (t1 != null) {
+ t1.getEnvironment().getExecutingThread().join();
+ }
+ if (t2 != null) {
+ t2.getEnvironment().getExecutingThread().join();
+ }
+
+ assertEquals(ExecutionState2.FINISHED, t1.getExecutionState());
+ assertEquals(ExecutionState2.FINISHED, t2.getExecutionState());
+
+ tasks = tm.getAllRunningTasks();
+ assertEquals(0, tasks.size());
+
+ // the class loaders should be de-registered
+ assertNull(LibraryCacheManager.getClassLoader(jid));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static JobManager getJobManagerMockBase() {
+ JobManager jm = mock(JobManager.class);
+
+ final InstanceID iid = new InstanceID();
+
+ when(jm.registerTaskManager(Matchers.any(InstanceConnectionInfo.class), Matchers.any(HardwareDescription.class), Matchers.anyInt()))
+ .thenReturn(iid);
+
+ when(jm.sendHeartbeat(iid)).thenReturn(true);
+
+ return jm;
+ }
+
+ public static TaskManager createTaskManager(JobManager jm) throws Exception {
+ InetAddress localhost = InetAddress.getLoopbackAddress();
+ InetSocketAddress jmMockAddress = new InetSocketAddress(localhost, 55443);
+
+ Configuration cfg = new Configuration();
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+ GlobalConfiguration.includeConfiguration(cfg);
+
+ return new TaskManager(ExecutionMode.LOCAL, jm, jm, jm, jm, jmMockAddress, localhost);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class TestInvokableCorrect extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() {}
+ }
+
+ public static final class TestInvokableBlockingCancelable extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() throws Exception {
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+
+ public static final class Sender extends AbstractInvokable {
+
+ private RecordWriter<IntegerRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ writer = new RecordWriter<IntegerRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ writer.initializeSerializers();
+ writer.emit(new IntegerRecord(42));
+ writer.emit(new IntegerRecord(1337));
+ writer.flush();
+ }
+ }
+
+ public static final class Receiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ IntegerRecord i1 = reader.next();
+ IntegerRecord i2 = reader.next();
+ IntegerRecord i3 = reader.next();
+
+ if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+ throw new Exception("Wrong Data Received");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
new file mode 100644
index 0000000..d225483
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.protocols.AccumulatorProtocol;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class TaskTest {
+
+ @Test
+ public void testTaskStates() {
+ try {
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final TaskManager taskManager = mock(TaskManager.class);
+ final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+
+ Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+ task.setEnvironment(env);
+
+ assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+
+ // cancel
+ task.cancelExecution();
+ assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+
+ // cannot go into running or finished state
+
+ assertFalse(task.startExecution());
+ assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+
+ assertFalse(task.markAsFinished());
+ assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+
+ task.markFailed(new Exception("test"));
+ assertTrue(ExecutionState2.CANCELED == task.getExecutionState());
+
+ verify(taskManager, times(1)).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELED, null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTaskStartFinish() {
+ try {
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final TaskManager taskManager = mock(TaskManager.class);
+
+
+ final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+ Thread operation = new Thread() {
+ public void run() {
+ try {
+ assertTrue(task.markAsFinished());
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+
+ final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+ when(env.getExecutingThread()).thenReturn(operation);
+
+ assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+
+ // start the execution
+ task.setEnvironment(env);
+ task.startExecution();
+
+ // wait for the execution to be finished
+ operation.join();
+
+ if (error.get() != null) {
+ ExceptionUtils.rethrow(error.get());
+ }
+
+ assertEquals(ExecutionState2.FINISHED, task.getExecutionState());
+
+ verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState2.FINISHED, null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTaskFailesInRunning() {
+ try {
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final TaskManager taskManager = mock(TaskManager.class);
+
+ final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+ Thread operation = new Thread() {
+ public void run() {
+ try {
+ task.markFailed(new Exception("test exception message"));
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+
+ final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+ when(env.getExecutingThread()).thenReturn(operation);
+
+ assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+
+ // start the execution
+ task.setEnvironment(env);
+ task.startExecution();
+
+ // wait for the execution to be finished
+ operation.join();
+
+ if (error.get() != null) {
+ ExceptionUtils.rethrow(error.get());
+ }
+
+ // make sure the final state is correct and the task manager knows the changes
+ assertEquals(ExecutionState2.FAILED, task.getExecutionState());
+ verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), Matchers.eq(eid), Matchers.eq(ExecutionState2.FAILED), Matchers.anyString());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTaskCanceledInRunning() {
+ try {
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final TaskManager taskManager = mock(TaskManager.class);
+
+ final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+ // latches to create a deterministic order of events
+ final OneShotLatch toRunning = new OneShotLatch();
+ final OneShotLatch afterCanceling = new OneShotLatch();
+
+ Thread operation = new Thread() {
+ public void run() {
+ try {
+ toRunning.trigger();
+ afterCanceling.await();
+ assertFalse(task.markAsFinished());
+ task.cancelingDone();
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+
+ final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+ when(env.getExecutingThread()).thenReturn(operation);
+
+ assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+
+ // start the execution
+ task.setEnvironment(env);
+ task.startExecution();
+
+ toRunning.await();
+ task.cancelExecution();
+ afterCanceling.trigger();
+
+ // wait for the execution to be finished
+ operation.join();
+
+ if (error.get() != null) {
+ ExceptionUtils.rethrow(error.get());
+ }
+
+ // make sure the final state is correct and the task manager knows the changes
+ assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+ verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELED, null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTaskWithEnvironment() {
+ try {
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final TaskManager taskManager = mock(TaskManager.class);
+
+ TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
+ new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+
+ RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
+ mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
+ mock(AccumulatorProtocol.class));
+
+ task.setEnvironment(env);
+
+ assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+
+ task.startExecution();
+ task.getEnvironment().getExecutingThread().join();
+
+ assertEquals(ExecutionState2.FINISHED, task.getExecutionState());
+
+ verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState2.FINISHED, null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTaskWithEnvironmentAndException() {
+ try {
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final TaskManager taskManager = mock(TaskManager.class);
+
+ TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
+ new Configuration(), new Configuration(), TestInvokableWithException.class.getName(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ Collections.<GateDeploymentDescriptor>emptyList(),
+ new String[0], 0);
+
+ Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+
+ RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
+ mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
+ mock(AccumulatorProtocol.class));
+
+ task.setEnvironment(env);
+
+ assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+
+ task.startExecution();
+ task.getEnvironment().getExecutingThread().join();
+
+ assertEquals(ExecutionState2.FAILED, task.getExecutionState());
+
+ verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), Matchers.eq(eid), Matchers.eq(ExecutionState2.FAILED), Matchers.anyString());
+ verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELING, null);
+ verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELED, null);
+ verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState2.FINISHED, null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class TestInvokableCorrect extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() {}
+ }
+
+ public static final class TestInvokableWithException extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() throws Exception {
+ throw new Exception("test exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java
deleted file mode 100644
index c98536e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.fs.LineReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-
-public class DoubleSourceTask extends AbstractInvokable {
-
- private RecordWriter<StringRecord> output1 = null;
-
- private RecordWriter<StringRecord> output2 = null;
-
- @Override
- public void invoke() throws Exception {
- this.output1.initializeSerializers();
- this.output2.initializeSerializers();
-
- final Iterator<FileInputSplit> splitIterator = getInputSplits();
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- final long start = split.getStart();
- final long length = split.getLength();
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
-
- final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
- byte[] line = lineReader.readLine();
-
- while (line != null) {
-
- // Create a string object from the data read
- StringRecord str = new StringRecord();
- str.set(line);
-
- // Send out string
- output1.emit(str);
- output2.emit(str);
-
- line = lineReader.readLine();
- }
-
- // Close the stream;
- lineReader.close();
- }
-
- this.output1.flush();
- this.output2.flush();
- }
-
- @Override
- public void registerInputOutput() {
- this.output1 = new RecordWriter<StringRecord>(this);
- this.output2 = new RecordWriter<StringRecord>(this);
- }
-
- private Iterator<FileInputSplit> getInputSplits() {
-
- final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
- return new Iterator<FileInputSplit>() {
-
- private FileInputSplit nextSplit;
-
- private boolean exhausted;
-
- @Override
- public boolean hasNext() {
- if (exhausted) {
- return false;
- }
-
- if (nextSplit != null) {
- return true;
- }
-
- FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
-
- if (split != null) {
- this.nextSplit = split;
- return true;
- }
- else {
- exhausted = true;
- return false;
- }
- }
-
- @Override
- public FileInputSplit next() {
- if (this.nextSplit == null && !hasNext()) {
- throw new NoSuchElementException();
- }
-
- final FileInputSplit tmp = this.nextSplit;
- this.nextSplit = null;
- return tmp;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java
deleted file mode 100644
index 500f9a1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.fs.LineReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-
-/**
- * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
- *
- */
-public class FileLineReader extends AbstractInvokable {
-
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void invoke() throws Exception {
-
- output.initializeSerializers();
-
- final Iterator<FileInputSplit> splitIterator = getInputSplits();
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- long start = split.getStart();
- long length = split.getLength();
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
-
- final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
- byte[] line = lineReader.readLine();
-
- while (line != null) {
-
- // Create a string object from the data read
- StringRecord str = new StringRecord();
- str.set(line);
-
- // Send out string
- output.emit(str);
-
- line = lineReader.readLine();
- }
-
- // Close the stream;
- lineReader.close();
- }
-
- this.output.flush();
- }
-
- @Override
- public void registerInputOutput() {
- output = new RecordWriter<StringRecord>(this);
- }
-
- private Iterator<FileInputSplit> getInputSplits() {
-
- final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
- return new Iterator<FileInputSplit>() {
-
- private FileInputSplit nextSplit;
-
- private boolean exhausted;
-
- @Override
- public boolean hasNext() {
- if (exhausted) {
- return false;
- }
-
- if (nextSplit != null) {
- return true;
- }
-
- FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
-
- if (split != null) {
- this.nextSplit = split;
- return true;
- }
- else {
- exhausted = true;
- return false;
- }
- }
-
- @Override
- public FileInputSplit next() {
- if (this.nextSplit == null && !hasNext()) {
- throw new NoSuchElementException();
- }
-
- final FileInputSplit tmp = this.nextSplit;
- this.nextSplit = null;
- return tmp;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java
deleted file mode 100644
index 529ae06..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A file line writer reads string records its input gate and writes them to the associated output file.
- *
- */
-public class FileLineWriter extends AbstractInvokable {
- /**
- * The record reader through which incoming string records are received.
- */
- private RecordReader<StringRecord> input = null;
-
-
- @Override
- public void invoke() throws Exception {
-
- final Configuration conf = getEnvironment().getTaskConfiguration();
- final String outputPathString = conf.getString(JobFileOutputVertex.PATH_PROPERTY, null);
-
- Path outputPath = new Path(outputPathString);
-
- FileSystem fs = FileSystem.get(outputPath.toUri());
- if (fs.exists(outputPath)) {
- FileStatus status = fs.getFileStatus(outputPath);
-
- if (status.isDir()) {
- outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
- }
- }
-
- final FSDataOutputStream outputStream = fs.create(outputPath, true);
-
- while (this.input.hasNext()) {
-
- StringRecord record = this.input.next();
- byte[] recordByte = (record.toString() + "\r\n").getBytes();
- outputStream.write(recordByte, 0, recordByte.length);
- }
-
- outputStream.close();
-
- }
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java
deleted file mode 100644
index 944e8f1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.AbstractJobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-
-public final class JobFileInputVertex extends AbstractJobInputVertex {
-
- /**
- * The fraction that the last split may be larger than the others.
- */
- private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-
- /**
- * The path pointing to the input file/directory.
- */
- private Path path;
-
-
- public JobFileInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex with the specified name.
- *
- * @param name
- * the name of the new job file input vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileInputVertex(String name, JobGraph jobGraph) {
- this(name, null, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileInputVertex(JobGraph jobGraph) {
- this(null, jobGraph);
- }
-
- /**
- * Sets the path of the file the job file input vertex's task should read from.
- *
- * @param path
- * the path of the file the job file input vertex's task should read from
- */
- public void setFilePath(final Path path) {
- this.path = path;
- }
-
- /**
- * Returns the path of the file the job file input vertex's task should read from.
- *
- * @return the path of the file the job file input vertex's task should read from or <code>null</code> if no path
- * has yet been set
- */
- public Path getFilePath() {
- return this.path;
- }
-
- @Override
- public void read(final DataInputView in) throws IOException {
- super.read(in);
-
- // Read path of the input file
- final boolean isNotNull = in.readBoolean();
- if (isNotNull) {
- this.path = new Path();
- this.path.read(in);
- }
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- super.write(out);
-
- // Write out the path of the input file
- if (this.path == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- this.path.write(out);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
- final Path path = this.path;
- final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
-
- // get all the files that are involved in the splits
- final List<FileStatus> files = new ArrayList<FileStatus>();
- long totalLength = 0;
-
- final FileSystem fs = path.getFileSystem();
- final FileStatus pathFile = fs.getFileStatus(path);
-
- if (pathFile.isDir()) {
- // input is directory. list all contained files
- final FileStatus[] dir = fs.listStatus(path);
- for (int i = 0; i < dir.length; i++) {
- if (!dir[i].isDir()) {
- files.add(dir[i]);
- totalLength += dir[i].getLen();
- }
- }
-
- } else {
- files.add(pathFile);
- totalLength += pathFile.getLen();
- }
-
- final long minSplitSize = 1;
- final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
- (totalLength % minNumSplits == 0 ? 0 : 1));
-
- // now that we have the files, generate the splits
- int splitNum = 0;
- for (final FileStatus file : files) {
-
- final long len = file.getLen();
- final long blockSize = file.getBlockSize();
-
- final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
- final long halfSplit = splitSize >>> 1;
-
- final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
-
- if (len > 0) {
-
- // get the block locations and make sure they are in order with respect to their offset
- final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
- Arrays.sort(blocks);
-
- long bytesUnassigned = len;
- long position = 0;
-
- int blockIndex = 0;
-
- while (bytesUnassigned > maxBytesForLastSplit) {
- // get the block containing the majority of the data
- blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
- // create a new split
- final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
- blocks[blockIndex]
- .getHosts());
- inputSplits.add(fis);
-
- // adjust the positions
- position += splitSize;
- bytesUnassigned -= splitSize;
- }
-
- // assign the last split
- if (bytesUnassigned > 0) {
- blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
- final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
- bytesUnassigned,
- blocks[blockIndex].getHosts());
- inputSplits.add(fis);
- }
- } else {
- // special case with a file of zero bytes size
- final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
- String[] hosts;
- if (blocks.length > 0) {
- hosts = blocks[0].getHosts();
- } else {
- hosts = new String[0];
- }
- final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
- inputSplits.add(fis);
- }
- }
-
- return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
- }
-
- /**
- * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
- * offset.
- *
- * @param blocks
- * The different blocks of the file. Must be ordered by their offset.
- * @param offset
- * The offset of the position in the file.
- * @param startIndex
- * The earliest index to look at.
- * @return The index of the block containing the given position.
- */
- private final int getBlockIndexForPosition(final BlockLocation[] blocks, final long offset,
- final long halfSplitSize, final int startIndex) {
-
- // go over all indexes after the startIndex
- for (int i = startIndex; i < blocks.length; i++) {
- long blockStart = blocks[i].getOffset();
- long blockEnd = blockStart + blocks[i].getLength();
-
- if (offset >= blockStart && offset < blockEnd) {
- // got the block where the split starts
- // check if the next block contains more than this one does
- if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
- return i + 1;
- } else {
- return i;
- }
- }
- }
- throw new IllegalArgumentException("The given offset is not contained in the any block.");
- }
-
-
- @Override
- public Class<FileInputSplit> getInputSplitType() {
- return FileInputSplit.class;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java
deleted file mode 100644
index 6f8ce85..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import java.io.IOException;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.AbstractJobOutputVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-
-public class JobFileOutputVertex extends AbstractJobOutputVertex {
-
- public static final String PATH_PROPERTY = "outputPath";
-
- /**
- * The path pointing to the output file/directory.
- */
- private Path path;
-
-
- public JobFileOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileOutputVertex(String name, JobGraph jobGraph) {
- this(name, null, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileOutputVertex(JobGraph jobGraph) {
- this(null, jobGraph);
- }
-
- /**
- * Sets the path of the file the job file input vertex's task should write to.
- *
- * @param path
- * the path of the file the job file input vertex's task should write to
- */
- public void setFilePath(Path path) {
- this.path = path;
- getConfiguration().setString(PATH_PROPERTY, path.toString());
- }
-
- /**
- * Returns the path of the file the job file output vertex's task should write to.
- *
- * @return the path of the file the job file output vertex's task should write to or <code>null</code> if no path
- * has yet been set
- */
- public Path getFilePath() {
- return this.path;
- }
-
- @Override
- public void read(final DataInputView in) throws IOException {
- super.read(in);
-
- // Read path of the input file
- boolean isNotNull = in.readBoolean();
- if (isNotNull) {
- this.path = new Path();
- this.path.read(in);
- }
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- super.write(out);
-
- // Write out the path of the input file
- if (this.path == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- this.path.write(out);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
new file mode 100644
index 0000000..f55f1cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.util;
+
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration;
+import org.junit.Test;
+
+
+public class DelegatingConfigurationTest {
+
+ /**
+ * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
+ */
+ @Test
+ public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
+ Comparator<Method> methodComparator = new Comparator<Method>() {
+ @Override
+ public int compare(Method o1, Method o2) {
+ String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes());
+ String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes());
+ return o1Str.compareTo( o2Str );
+ }
+
+ private String typeParamToString(Class<?>[] classes) {
+ String str = "";
+ for(Object t : classes) {
+ str += t.toString();
+ }
+ return str;
+ }
+ };
+
+ // For each method in the Configuration class...
+ Method[] confMethods = Configuration.class.getDeclaredMethods();
+ Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods();
+ Arrays.sort(confMethods, methodComparator);
+ Arrays.sort(delegateMethods, methodComparator);
+ match : for (Method configurationMethod : confMethods) {
+ boolean hasMethod = false;
+ if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
+ continue;
+ }
+ // Find matching method in wrapper class and call it
+ mismatch: for (Method wrapperMethod : delegateMethods) {
+ if (configurationMethod.getName().equals(wrapperMethod.getName())) {
+
+ // Get parameters for method
+ Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes();
+ Class<?>[] configMethodParams = configurationMethod.getParameterTypes();
+ if(wrapperMethodParams.length != configMethodParams.length) {
+ System.err.println("Length");
+ break mismatch;
+ }
+ for(int i = 0; i < wrapperMethodParams.length; i++) {
+ if(wrapperMethodParams[i] != configMethodParams[i]) {
+ break mismatch;
+ }
+ }
+ hasMethod = true;
+ break match;
+ }
+ }
+ assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java
deleted file mode 100644
index 5fdf433..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-import static org.junit.Assert.assertTrue;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration;
-import org.junit.Test;
-
-
-public class TestDelegatingConfiguration {
-
- /**
- * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
- */
- @Test
- public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
-
- Comparator<Method> methodComparator = new Comparator<Method>() {
- @Override
- public int compare(Method o1, Method o2) {
- String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes());
- String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes());
- return o1Str.compareTo( o2Str );
- }
-
- private String typeParamToString(Class<?>[] classes) {
- String str = "";
- for(Object t : classes) {
- str += t.toString();
- }
- return str;
- }
- };
-
- // For each method in the Configuration class...
- Method[] confMethods = Configuration.class.getDeclaredMethods();
- Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods();
- Arrays.sort(confMethods, methodComparator);
- Arrays.sort(delegateMethods, methodComparator);
- match : for (Method configurationMethod : confMethods) {
- boolean hasMethod = false;
- if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
- continue;
- }
- // Find matching method in wrapper class and call it
- mismatch: for (Method wrapperMethod : delegateMethods) {
- if (configurationMethod.getName().equals(wrapperMethod.getName())) {
-
- // Get parameters for method
- Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes();
- Class<?>[] configMethodParams = configurationMethod.getParameterTypes();
- if(wrapperMethodParams.length != configMethodParams.length) {
- System.err.println("Length");
- break mismatch;
- }
- for(int i = 0; i < wrapperMethodParams.length; i++) {
- if(wrapperMethodParams[i] != configMethodParams[i]) {
- break mismatch;
- }
- }
- hasMethod = true;
- break match;
- }
- }
- assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index a409222..33112af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -227,9 +227,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
// -------------------------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
- private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ private static InputFormatVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
CsvInputFormat pointsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
- InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
+ InputFormatVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -241,9 +241,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
}
@SuppressWarnings("unchecked")
- private static InputFormatInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ private static InputFormatVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
CsvInputFormat modelsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
- InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
+ InputFormatVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -278,8 +278,8 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
return pointsInput;
}
- private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
- OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+ private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -308,10 +308,10 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("Distance Builder");
// -- vertices ---------------------------------------------------------------------------------------------
- InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
- InputFormatInputVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
+ InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
+ InputFormatVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
- OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+ OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
// -- edges ------------------------------------------------------------------------------------------------
JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 4d46b16..678a7e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -32,12 +32,9 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
-import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -96,10 +93,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
// Job vertex builder methods
// -------------------------------------------------------------------------------------------------------------
- private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ private static InputFormatVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
@SuppressWarnings("unchecked")
CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
- InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
+ InputFormatVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -117,10 +114,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return pointsInput;
}
- private static InputFormatInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ private static InputFormatVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
@SuppressWarnings("unchecked")
CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
- InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
+ InputFormatVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -139,9 +136,9 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return modelsInput;
}
- private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+ private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
- OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+ OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
{
TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -250,13 +247,13 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
tailConfig.setOutputSerializer(outputSerializer);
// the udf
- tailConfig.setStubWrapper(new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new RecomputeClusterCenter())));
+ tailConfig.setStubWrapper(new UserCodeObjectWrapper<RecomputeClusterCenter>(new RecomputeClusterCenter()));
return tail;
}
- private static SimpleOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
- SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+ private static OutputFormatVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
+ OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, dop);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.setIterationId(ITERATION_ID);
@@ -277,19 +274,19 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("KMeans Iterative");
// -- vertices ---------------------------------------------------------------------------------------------
- InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
- InputFormatInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
+ InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
+ InputFormatVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
- SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
+ OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
- SimpleOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks);
+ OutputFormatVertex sync = createSync(jobGraph, numIterations, numSubTasks);
- OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+ OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
// -- edges ------------------------------------------------------------------------------------------------
JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8da4e5c..dad2370 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -174,12 +174,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// Invariant vertices across all variants
// -----------------------------------------------------------------------------------------------------------------
- private static InputFormatInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
+ private static InputFormatVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
TypeSerializerFactory<?> serializer,
TypeComparatorFactory<?> comparator) {
@SuppressWarnings("unchecked")
CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
- InputFormatInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
+ InputFormatVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
jobGraph, numSubTasks);
TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
{
@@ -327,9 +327,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
return intermediate;
}
- private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
+ private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
TypeSerializerFactory<?> serializer) {
- OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
+ OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
{
@@ -352,14 +352,14 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
return output;
}
- private static SimpleOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
- SimpleOutputVertex fakeTailOutput =
+ private static OutputFormatVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
+ OutputFormatVertex fakeTailOutput =
JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
return fakeTailOutput;
}
- private static SimpleOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
- SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
+ private static OutputFormatVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
+ OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(maxIterations);
syncConfig.setIterationId(ITERATION_ID);
@@ -389,16 +389,16 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
// -- invariant vertices -----------------------------------------------------------------------------------
- InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
JobTaskVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
- OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
- SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+ OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+ OutputFormatVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+ OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// --------------- the tail (solution set join) ---------------
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
@@ -473,8 +473,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
// input
- InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
// head
JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -486,10 +486,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
// output and auxiliaries
- OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- SimpleOutputVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
- SimpleOutputVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
- SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+ OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+ OutputFormatVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
+ OutputFormatVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
+ OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// ------------------ the intermediate (ss join) ----------------------
JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
@@ -624,8 +624,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
// input
- InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
// head
JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -637,9 +637,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
// output and auxiliaries
- OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
- SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+ OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+ OutputFormatVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+ OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// ------------------ the intermediate (ws update) ----------------------
JobTaskVertex wsUpdateIntermediate =
@@ -750,8 +750,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
// input
- InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+ InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
// head
JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -761,9 +761,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
// output and auxiliaries
- OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
- SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+ OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+ OutputFormatVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+ OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// ------------------ the intermediate (ss update) ----------------------
JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,