You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/26 18:51:26 UTC

[2/3] incubator-flink git commit: [FLINK-1278] [runtime] (part 1) Remove special code paths for the Record data type in the input readers and the source task.

[FLINK-1278] [runtime] (part 1) Remove special code paths for the Record data type in the input readers and the source task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/48b6d01c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/48b6d01c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/48b6d01c

Branch: refs/heads/master
Commit: 48b6d01c2ec73b10bcb3d668dd0bac4dc715f524
Parents: d858930
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 26 17:18:55 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 26 18:14:59 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |   1 -
 .../flink/runtime/operators/DataSinkTask.java   |  22 +---
 .../flink/runtime/operators/DataSourceTask.java | 109 ++++++-------------
 .../runtime/operators/RegularPactTask.java      |  20 +---
 .../operators/util/RecordReaderIterator.java    |  57 ----------
 .../operators/chaining/ChainTaskTest.java       |   1 +
 .../operators/testutils/MockEnvironment.java    |  15 ++-
 7 files changed, 55 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index bb537b7..9c2efb3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -851,7 +851,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSinkTask.class);
-		vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
 		vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
 		
 		// set user code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index b1185c2..74c625f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -37,22 +37,17 @@ import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubExcepti
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.CloseableInputProvider;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.RecordReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
- * DataSinkTask which is executed by a Flink task manager.
- * The task hands the data to an output format.
+ * DataSinkTask which is executed by a task manager. The task hands the data to an output format.
  * 
  * @see OutputFormat
  */
 public class DataSinkTask<IT> extends AbstractInvokable {
 	
-	public static final String DEGREE_OF_PARALLELISM_KEY = "sink.dop";
-	
 	// Obtain DataSinkTask Logger
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
 
@@ -339,17 +334,10 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		
 		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		
-		if (this.inputTypeSerializerFactory.getDataType() == Record.class) {
-			// record specific deserialization
-			MutableReader<Record> reader = (MutableReader<Record>) inputReader;
-			this.reader = (MutableObjectIterator<IT>)new RecordReaderIterator(reader);
-		} else {
-			// generic data type serialization
-			MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-			@SuppressWarnings({ "rawtypes" })
-			final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
-			this.reader = (MutableObjectIterator<IT>)iter;
-		}
+		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+		@SuppressWarnings({ "rawtypes" })
+		final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
+		this.reader = (MutableObjectIterator<IT>)iter;
 		
 		// final sanity check
 		if (numGates != this.config.getNumInputs()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index bfd2507..2db652f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.shipping.OutputCollector;
-import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 /**
- * DataSourceTask which is executed by a Nephele task manager. The task reads data and uses an 
+ * DataSourceTask which is executed by a task manager. The task reads data and uses an 
  * {@link InputFormat} to create records from the input.
  * 
  * @see org.apache.flink.api.common.io.InputFormat
@@ -140,80 +137,44 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 				}
 				
 				try {
-					// ======= special-case the Record, to help the JIT and avoid some casts ======
-					if (record.getClass() == Record.class) {
-						Record typedRecord = (Record) record;
-						@SuppressWarnings("unchecked")
-						final InputFormat<Record, InputSplit> inFormat = (InputFormat<Record, InputSplit>) format;
+					// special case to make the loops tighter
+					if (this.output instanceof OutputCollector) {
+						final OutputCollector<OT> output = (OutputCollector<OT>) this.output;
 						
-						if (this.output instanceof RecordOutputCollector) {
-							// Record going directly into network channels
-							final RecordOutputCollector output = (RecordOutputCollector) this.output;
-							while (!this.taskCanceled && !inFormat.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								typedRecord.clear();
-								Record returnedRecord = null;
-								if ((returnedRecord = inFormat.nextRecord(typedRecord)) != null) {
-									output.collect(returnedRecord);
-								}
-							}
-						} else if (this.output instanceof ChainedCollectorMapDriver) {
-							// Record going to a chained map task
-							@SuppressWarnings("unchecked")
-							final ChainedCollectorMapDriver<Record, ?> output = (ChainedCollectorMapDriver<Record, ?>) this.output;
+						// as long as there is data to read
+						while (!this.taskCanceled && !format.reachedEnd()) {
 							
-							// as long as there is data to read
-							while (!this.taskCanceled && !inFormat.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								typedRecord.clear();
-								if ((typedRecord = inFormat.nextRecord(typedRecord)) != null) {
-									// This is where map of UDF gets called
-									output.collect(typedRecord);
-								}
-							}
-						} else {
-							// Record going to some other chained task
-							@SuppressWarnings("unchecked")
-							final Collector<Record> output = (Collector<Record>) this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !inFormat.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								typedRecord.clear();
-								if ((typedRecord = inFormat.nextRecord(typedRecord)) != null){
-									output.collect(typedRecord);
-								}
+							OT returned;
+							if ((returned = format.nextRecord(record)) != null) {
+								output.collect(returned);
+								record = returned;
 							}
 						}
-					} else {
-						// general types. we make a case distinction here for the common cases, in order to help
-						// JIT method inlining
-						if (this.output instanceof OutputCollector) {
-							final OutputCollector<OT> output = (OutputCollector<OT>) this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !format.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								if ((record = format.nextRecord(record)) != null) {
-									output.collect(record);
-								}
-							}
-						} else if (this.output instanceof ChainedCollectorMapDriver) {
-							@SuppressWarnings("unchecked")
-							final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !format.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								if ((record = format.nextRecord(record)) != null) {
-									output.collect(record);
-								}
+					}
+					else if (this.output instanceof ChainedCollectorMapDriver) {
+						@SuppressWarnings("unchecked")
+						final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output;
+						
+						// as long as there is data to read
+						while (!this.taskCanceled && !format.reachedEnd()) {
+							
+							OT returned;
+							if ((returned = format.nextRecord(record)) != null) {
+								output.collect(returned);
+								record = returned;
 							}
-						} else {
-							final Collector<OT> output = this.output;
-							// as long as there is data to read
-							while (!this.taskCanceled && !format.reachedEnd()) {
-								// build next pair and ship pair if it is valid
-								if ((record = format.nextRecord(record)) != null) {
-									output.collect(record);
-								}
+						}
+					}
+					else {
+						final Collector<OT> output = this.output;
+						
+						// as long as there is data to read
+						while (!this.taskCanceled && !format.reachedEnd()) {
+							
+							OT returned;
+							if ((returned = format.nextRecord(record)) != null) {
+								output.collect(returned);
+								record = returned;
 							}
 						}
 					}
@@ -279,7 +240,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 	
 	/**
 	 * Initializes the InputFormat implementation and configuration.
-l	 * 
+	 * 
 	 * @throws RuntimeException
 	 *         Throws if instance of InputFormat implementation can not be
 	 *         obtained.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c1037b5..2a598ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -59,7 +59,6 @@ import org.apache.flink.runtime.operators.util.CloseableInputProvider;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.RecordReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -1031,20 +1030,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 	
 	protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) {
-
-		if (serializerFactory.getDataType().equals(Record.class)) {
-			// record specific deserialization
-			@SuppressWarnings("unchecked")
-			MutableReader<Record> reader = (MutableReader<Record>) inputReader;
-			return new RecordReaderIterator(reader);
-		} else {
-			// generic data type serialization
-			@SuppressWarnings("unchecked")
-			MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-			@SuppressWarnings({ "unchecked", "rawtypes" })
-			final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
-			return iter;
-		}
+		@SuppressWarnings("unchecked")
+		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
+		return iter;
 	}
 
 	protected int getNumTaskInputs() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
deleted file mode 100644
index d95a42c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/RecordReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.util;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
-* A {@link MutableObjectIterator} that wraps a Nephele Reader producing {@link Record}s.
-*/
-public final class RecordReaderIterator implements MutableObjectIterator<Record> {
-	
-	private final MutableReader<Record> reader;		// the source
-
-	/**
-	 * Creates a new iterator, wrapping the given reader.
-	 * 
-	 * @param reader The reader to wrap.
-	 */
-	public RecordReaderIterator(MutableReader<Record> reader) {
-		this.reader = reader;
-	}
-
-	@Override
-	public Record next(Record reuse) throws IOException {
-		try {
-			if (this.reader.next(reuse)) {
-				return reuse;
-			} else {
-				return null;
-			}
-		}
-		catch (InterruptedException e) {
-			throw new IOException("Reader interrupted.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 4c08ebd..0e029a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -146,6 +146,7 @@ public class ChainTaskTest extends TaskTestBase {
 				// driver
 				combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 				combineConfig.setDriverComparator(compFact, 0);
+				combineConfig.setDriverComparator(compFact, 1);
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48b6d01c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 60a81c1..9916b61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -69,7 +70,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 
 	private final Configuration taskConfiguration;
 
-	private final List<InputGate<Record>> inputs;
+	private final List<InputGate<DeserializationDelegate<Record>>> inputs;
 
 	private final List<OutputGate> outputs;
 
@@ -83,7 +84,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.inputs = new LinkedList<InputGate<Record>>();
+		this.inputs = new LinkedList<InputGate<DeserializationDelegate<Record>>>();
 		this.outputs = new LinkedList<OutputGate>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
@@ -172,7 +173,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 
 	}
 
-	private static class MockInputGate extends InputGate<Record> {
+	private static class MockInputGate extends InputGate<DeserializationDelegate<Record>> {
 		
 		private MutableObjectIterator<Record> it;
 
@@ -182,15 +183,17 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 		}
 
 		@Override
-		public void registerRecordAvailabilityListener(final RecordAvailabilityListener<Record> listener) {
+		public void registerRecordAvailabilityListener(final RecordAvailabilityListener<DeserializationDelegate<Record>> listener) {
 			super.registerRecordAvailabilityListener(listener);
 			this.notifyRecordIsAvailable(0);
 		}
 		
 		@Override
-		public InputChannelResult readRecord(Record target) throws IOException, InterruptedException {
+		public InputChannelResult readRecord(DeserializationDelegate<Record> target) throws IOException, InterruptedException {
 
-			if ((target = it.next(target)) != null) {
+			Record reuse = target != null ? target.getInstance() : null;
+			
+			if ((reuse = it.next(reuse)) != null) {
 				// everything comes from the same source channel and buffer in this mock
 				notifyRecordIsAvailable(0);
 				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;