You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:51 UTC
[27/63] [abbrv] git commit: [FLINK-1094] Reworked, improved,
and testes split assigners
[FLINK-1094] Reworked, improved, and testes split assigners
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c32569ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c32569ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c32569ae
Branch: refs/heads/master
Commit: c32569aed12ffa968e2c2289c2d56db262c0eba4
Parents: 028fcf5
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 18 04:22:00 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200
----------------------------------------------------------------------
.../org/apache/flink/core/fs/BlockLocation.java | 10 +-
.../flink/core/fs/FileChannelWrapper.java | 239 ------------
.../apache/flink/core/fs/FileInputSplit.java | 148 +++-----
.../apache/flink/core/io/GenericInputSplit.java | 39 +-
.../org/apache/flink/core/io/InputSplit.java | 5 +-
.../flink/core/io/InputSplitAssigner.java | 35 ++
.../flink/core/io/LocatableInputSplit.java | 96 +++--
.../DefaultInputSplitAssigner.java | 96 ++---
.../splitassigner/InputSplitAssigner.java | 59 ---
.../splitassigner/InputSplitManager.java | 266 -------------
.../splitassigner/InputSplitTracker.java | 166 --------
.../splitassigner/InputSplitWrapper.java | 20 +-
.../LocatableInputSplitAssigner.java | 225 +++++++----
.../splitassigner/LocatableInputSplitList.java | 211 -----------
.../file/FileInputSplitAssigner.java | 119 ------
.../splitassigner/file/FileInputSplitList.java | 209 ----------
.../instance/DefaultInstanceManagerTest.java | 4 -
.../instance/LocalInstanceManagerTest.java | 9 +-
.../splitassigner/DefaultSplitAssignerTest.java | 121 ++++++
.../LocatableSplitAssignerTest.java | 379 +++++++++++++++++++
20 files changed, 883 insertions(+), 1573 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
index b557c36..14056ab 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
@@ -16,22 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.core.fs;
import java.io.IOException;
/**
- * A BlockLocation lists hosts, offset and length
- * of block.
- *
+ * A BlockLocation lists hosts, offset and length of block.
*/
public interface BlockLocation extends Comparable<BlockLocation> {
/**
* Get the list of hosts (hostname) hosting this block.
*
- * @return a list of hosts (hostname) hosting this block
+ * @return A list of hosts (hostname) hosting this block.
* @throws IOException
* thrown if the list of hosts could not be retrieved
*/
@@ -40,7 +37,7 @@ public interface BlockLocation extends Comparable<BlockLocation> {
/**
* Get the start offset of the file associated with this block.
*
- * @return the start offset of the file associated with this block
+ * @return The start offset of the file associated with this block.
*/
long getOffset();
@@ -50,5 +47,4 @@ public interface BlockLocation extends Comparable<BlockLocation> {
* @return the length of the block
*/
long getLength();
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java
deleted file mode 100644
index bf87642..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.core.fs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-public final class FileChannelWrapper extends FileChannel {
-
- private final FileSystem fs;
-
- private final Path checkpointFile;
-
- private final byte[] buf;
-
- private final short replication;
-
- private FSDataOutputStream outputStream = null;
-
- private FSDataInputStream inputStream = null;
-
- private long nextExpectedWritePosition = 0L;
-
- private long nextExpectedReadPosition = 0L;
-
- public FileChannelWrapper(final FileSystem fs, final Path checkpointFile, final int bufferSize,
- final short replication) {
-
- this.fs = fs;
- this.checkpointFile = checkpointFile;
- this.buf = new byte[bufferSize];
- this.replication = replication;
- }
-
-
- @Override
- public void force(final boolean metaData) throws IOException {
-
- throw new UnsupportedOperationException("Method force is not implemented");
- }
-
-
- @Override
- public FileLock lock(final long position, final long size, final boolean shared) throws IOException {
-
- throw new UnsupportedOperationException("Method lock is not implemented");
- }
-
-
- @Override
- public MappedByteBuffer map(final MapMode mode, final long position, final long size) throws IOException {
-
- throw new UnsupportedOperationException("Method map is not implemented");
- }
-
-
- @Override
- public long position() throws IOException {
-
- throw new UnsupportedOperationException("Method position is not implemented");
- }
-
-
- @Override
- public FileChannel position(final long newPosition) throws IOException {
-
- throw new UnsupportedOperationException("Method position is not implemented");
- }
-
-
- @Override
- public int read(final ByteBuffer dst) throws IOException {
-
- throw new UnsupportedOperationException("Method read is not implemented");
- }
-
-
- @Override
- public int read(final ByteBuffer dst, final long position) throws IOException {
-
- final int length = Math.min(this.buf.length, dst.remaining());
-
- final FSDataInputStream inputStream = getInputStream();
- if (position != this.nextExpectedReadPosition) {
- System.out.println("Next expected position is " + this.nextExpectedReadPosition + ", seeking to "
- + position);
- inputStream.seek(position);
- this.nextExpectedReadPosition = position;
- }
-
- final int bytesRead = inputStream.read(this.buf, 0, length);
- if (bytesRead == -1) {
- return -1;
- }
-
- dst.put(this.buf, 0, length);
-
- this.nextExpectedReadPosition += bytesRead;
-
- return bytesRead;
- }
-
-
- @Override
- public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
-
- throw new UnsupportedOperationException("Method read is not implemented");
- }
-
-
- @Override
- public long size() throws IOException {
-
- throw new UnsupportedOperationException("Method size is not implemented");
- }
-
-
- @Override
- public long transferFrom(final ReadableByteChannel src, final long position, final long count) throws IOException {
-
- throw new UnsupportedOperationException("Method transferFrom is not implemented");
- }
-
-
- @Override
- public long transferTo(final long position, final long count, final WritableByteChannel target) throws IOException {
-
- throw new UnsupportedOperationException("Method transferTo is not implemented");
- }
-
-
- @Override
- public FileChannel truncate(final long size) throws IOException {
-
- throw new UnsupportedOperationException("Method truncate is not implemented");
- }
-
-
- @Override
- public FileLock tryLock(final long position, final long size, final boolean shared) throws IOException {
-
- throw new UnsupportedOperationException("Method tryLock is not implemented");
- }
-
-
- @Override
- public int write(final ByteBuffer src) throws IOException {
-
- return write(src, this.nextExpectedWritePosition);
- }
-
-
- @Override
- public int write(final ByteBuffer src, final long position) throws IOException {
-
- if (position != this.nextExpectedWritePosition) {
- throw new IOException("Next expected write position is " + this.nextExpectedWritePosition);
- }
-
- final FSDataOutputStream outputStream = getOutputStream();
-
- int totalBytesWritten = 0;
-
- while (src.hasRemaining()) {
-
- final int length = Math.min(this.buf.length, src.remaining());
- src.get(this.buf, 0, length);
- outputStream.write(this.buf, 0, length);
- totalBytesWritten += length;
- }
-
- this.nextExpectedWritePosition += totalBytesWritten;
-
- return totalBytesWritten;
- }
-
-
- @Override
- public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
-
- throw new UnsupportedOperationException("Method write is not implemented");
- }
-
- private FSDataOutputStream getOutputStream() throws IOException {
-
- if (this.outputStream == null) {
- this.outputStream = this.fs.create(this.checkpointFile, false, this.buf.length, this.replication,
- this.fs.getDefaultBlockSize());
- }
-
- return this.outputStream;
- }
-
- private FSDataInputStream getInputStream() throws IOException {
-
- if (this.inputStream == null) {
- this.inputStream = this.fs.open(this.checkpointFile, this.buf.length);
- }
-
- return this.inputStream;
- }
-
-
- @Override
- protected void implCloseChannel() throws IOException {
-
- if (this.outputStream != null) {
- this.outputStream.close();
- this.outputStream = null;
- }
-
- if (this.inputStream != null) {
- this.inputStream.close();
- this.inputStream = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index 40058c0..c4a69ba 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -16,28 +16,21 @@
* limitations under the License.
*/
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
package org.apache.flink.core.fs;
import java.io.IOException;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.StringRecord;
+import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A file input split provides information on a particular part of a file, possibly
- * hosted on a distributed file system and replicated among several hosts.
- *
+ * hosted on a distributed file system and replicated among several hosts.
*/
-public class FileInputSplit implements InputSplit {
+public class FileInputSplit extends LocatableInputSplit {
+
+ private static final long serialVersionUID = 1L;
/**
* The path of the file this file split refers to.
@@ -54,16 +47,8 @@ public class FileInputSplit implements InputSplit {
*/
private long length;
- /**
- * List of hosts (hostnames) containing the block, possibly <code>null</code>.
- */
- private String[] hosts;
-
- /**
- * The logical number of the split.
- */
- private int partitionNumber;
-
+ // --------------------------------------------------------------------------------------------
+
/**
* Constructs a split with host information.
*
@@ -78,20 +63,21 @@ public class FileInputSplit implements InputSplit {
* @param hosts
* the list of hosts containing the block, possibly <code>null</code>
*/
- public FileInputSplit(final int num, final Path file, final long start, final long length, final String[] hosts) {
- this.partitionNumber = num;
+ public FileInputSplit(int num, Path file, long start, long length, String[] hosts) {
+ super(num, hosts);
+
this.file = file;
this.start = start;
this.length = length;
- this.hosts = hosts;
}
/**
- * Constructor used to reconstruct the object at the receiver of an RPC call.
+ * Default constructor for deserialization.
*/
- public FileInputSplit() {
- }
+ public FileInputSplit() {}
+ // --------------------------------------------------------------------------------------------
+
/**
* Returns the path of the file containing this split's data.
*
@@ -118,35 +104,17 @@ public class FileInputSplit implements InputSplit {
public long getLength() {
return length;
}
-
- /**
- * Gets the names of the hosts that this file split resides on.
- *
- * @return The names of the hosts that this file split resides on.
- */
- public String[] getHostNames() {
- if (this.hosts == null) {
- return new String[] {};
- } else {
- return this.hosts;
- }
- }
+
+ // --------------------------------------------------------------------------------------------
@Override
- public int getSplitNumber() {
- return this.partitionNumber;
- }
-
- @Override
- public String toString() {
- return "[" + this.partitionNumber + "] " + file + ":" + start + "+" + length;
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- // write partition number
- out.writeInt(this.partitionNumber);
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ // write start and length
+ out.writeLong(this.start);
+ out.writeLong(this.length);
+
// write file
if (this.file != null) {
out.writeBoolean(true);
@@ -154,48 +122,56 @@ public class FileInputSplit implements InputSplit {
} else {
out.writeBoolean(false);
}
-
- // write start and length
- out.writeLong(this.start);
- out.writeLong(this.length);
-
- // write hosts
- if (this.hosts == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeInt(this.hosts.length);
- for (int i = 0; i < this.hosts.length; i++) {
- StringRecord.writeString(out, this.hosts[i]);
- }
- }
}
-
@Override
- public void read(final DataInputView in) throws IOException {
- // read partition number
- this.partitionNumber = in.readInt();
-
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ this.start = in.readLong();
+ this.length = in.readLong();
+
// read file path
boolean isNotNull = in.readBoolean();
if (isNotNull) {
this.file = new Path();
this.file.read(in);
}
-
- this.start = in.readLong();
- this.length = in.readLong();
-
- isNotNull = in.readBoolean();
- if (isNotNull) {
- final int numHosts = in.readInt();
- this.hosts = new String[numHosts];
- for (int i = 0; i < numHosts; i++) {
- this.hosts[i] = StringRecord.readString(in);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return getSplitNumber() ^ (file == null ? 0 : file.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && super.equals(obj) && obj instanceof FileInputSplit) {
+ FileInputSplit other = (FileInputSplit) obj;
+
+ if (this.file != null) {
+ if (!this.file.equals(other.file)) {
+ return false;
+ }
}
- } else {
- this.hosts = null;
+ else if (other.file != null) {
+ return false;
+ }
+
+ return this.start == other.start && this.length == other.length;
+ }
+ else {
+ return false;
}
}
+
+ @Override
+ public String toString() {
+ return '[' + getSplitNumber() + "] " + file + ":" + start + "+" + length;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
index ceecfa5..850ba1c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.core.io;
import java.io.IOException;
@@ -27,17 +26,19 @@ import org.apache.flink.core.memory.DataOutputView;
/**
* A generic input split that has only a partition number.
*/
-public class GenericInputSplit implements InputSplit {
+public class GenericInputSplit implements InputSplit, java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
/**
* The number of this split.
*/
- protected int partitionNumber;
+ private int partitionNumber;
/**
* The total number of partitions
*/
- protected int totalNumberOfPartitions;
+ private int totalNumberOfPartitions;
// --------------------------------------------------------------------------------------------
@@ -60,6 +61,17 @@ public class GenericInputSplit implements InputSplit {
// --------------------------------------------------------------------------------------------
@Override
+ public int getSplitNumber() {
+ return this.partitionNumber;
+ }
+
+ public int getTotalNumberOfSplits() {
+ return this.totalNumberOfPartitions;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
public void write(DataOutputView out) throws IOException {
out.writeInt(this.partitionNumber);
out.writeInt(this.totalNumberOfPartitions);
@@ -70,16 +82,25 @@ public class GenericInputSplit implements InputSplit {
this.partitionNumber = in.readInt();
this.totalNumberOfPartitions = in.readInt();
}
+
+ // --------------------------------------------------------------------------------------------
@Override
- public int getSplitNumber() {
- return this.partitionNumber;
+ public int hashCode() {
+ return this.partitionNumber ^ this.totalNumberOfPartitions;
}
- public int getTotalNumberOfSplits() {
- return this.totalNumberOfPartitions;
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof GenericInputSplit) {
+ GenericInputSplit other = (GenericInputSplit) obj;
+ return this.partitionNumber == other.partitionNumber &&
+ this.totalNumberOfPartitions == other.totalNumberOfPartitions;
+ } else {
+ return false;
+ }
}
-
+
public String toString() {
return "GenericSplit (" + this.partitionNumber + "/" + this.totalNumberOfPartitions + ")";
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
index 5f09d10..5ce0378 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.core.io;
-
/**
* This interface must be implemented by all kind of input splits that can be assigned to input formats.
*/
-public interface InputSplit extends IOReadableWritable {
+public interface InputSplit extends IOReadableWritable, java.io.Serializable {
+
/**
* Returns the number of this input split.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
new file mode 100644
index 0000000..f01c9f3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * An input split assigner distributes the {@link InputSplit}s among the instances on which a
+ * data source exists.
+ */
+public interface InputSplitAssigner {
+
+ /**
+ * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter
+ * to allow localized assignments.
+ *
+ * @param host The address of the host to assign the split to.
+ * @return the next input split to be consumed, or <code>null</code> if no more splits remain.
+ */
+ InputSplit getNextInputSplit(String host);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
index 9a4e366..cc36d99 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
@@ -16,20 +16,23 @@
* limitations under the License.
*/
-
package org.apache.flink.core.io;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
/**
* A locatable input split is an input split referring to input data which is located on one or more hosts.
*/
-public class LocatableInputSplit implements InputSplit {
+public class LocatableInputSplit implements InputSplit, java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final String[] EMPTY_ARR = new String[0];
+
/**
* The number of the split.
*/
@@ -40,6 +43,8 @@ public class LocatableInputSplit implements InputSplit {
*/
private String[] hostnames;
+ // --------------------------------------------------------------------------------------------
+
/**
* Creates a new locatable input split.
*
@@ -48,73 +53,86 @@ public class LocatableInputSplit implements InputSplit {
* @param hostnames
* the names of the hosts storing the data this input split refers to
*/
- public LocatableInputSplit(final int splitNumber, final String[] hostnames) {
-
- this.hostnames = hostnames;
+ public LocatableInputSplit(int splitNumber, String[] hostnames) {
+ this.splitNumber = splitNumber;
+ this.hostnames = hostnames == null ? EMPTY_ARR : hostnames;
+ }
+
+ public LocatableInputSplit(int splitNumber, String hostname) {
+ this.splitNumber = splitNumber;
+ this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname };
}
/**
* Default constructor for serialization/deserialization.
*/
- public LocatableInputSplit() {
- }
+ public LocatableInputSplit() {}
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getSplitNumber() {
+ return this.splitNumber;
+ }
+
/**
* Returns the names of the hosts storing the data this input split refers to
*
* @return the names of the hosts storing the data this input split refers to
*/
public String[] getHostnames() {
-
- if (this.hostnames == null) {
- return new String[] {};
- }
-
return this.hostnames;
}
+ // --------------------------------------------------------------------------------------------
@Override
- public void write(final DataOutputView out) throws IOException {
-
- // Write the split number
+ public void write(DataOutputView out) throws IOException {
out.writeInt(this.splitNumber);
-
- // Write hostnames
- if (this.hostnames == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeInt(this.hostnames.length);
- for (int i = 0; i < this.hostnames.length; i++) {
- StringRecord.writeString(out, this.hostnames[i]);
- }
+ out.writeInt(this.hostnames.length);
+ for (int i = 0; i < this.hostnames.length; i++) {
+ StringRecord.writeString(out, this.hostnames[i]);
}
}
-
@Override
- public void read(final DataInputView in) throws IOException {
-
- // Read the split number
+ public void read(DataInputView in) throws IOException {
this.splitNumber = in.readInt();
- // Read hostnames
- if (in.readBoolean()) {
- final int numHosts = in.readInt();
+ final int numHosts = in.readInt();
+ if (numHosts == 0) {
+ this.hostnames = EMPTY_ARR;
+ } else {
this.hostnames = new String[numHosts];
for (int i = 0; i < numHosts; i++) {
this.hostnames[i] = StringRecord.readString(in);
}
- } else {
- this.hostnames = null;
}
}
-
-
+
+ // --------------------------------------------------------------------------------------------
+
@Override
- public int getSplitNumber() {
-
+ public int hashCode() {
return this.splitNumber;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && obj instanceof LocatableInputSplit) {
+ LocatableInputSplit other = (LocatableInputSplit) obj;
+ return other.splitNumber == this.splitNumber && Arrays.deepEquals(other.hostnames, this.hostnames);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Locatable Split (" + splitNumber + ") at " + Arrays.toString(this.hostnames);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
index 916772a..44dae64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
@@ -16,88 +16,60 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager.splitassigner;
-import java.util.Arrays;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.core.io.InputSplitAssigner;
/**
* This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
- * simply returns all input splits of an input vertex in the order they were originally computed. The default input
- * split assigner is always used when a more specific {@link InputSplitAssigned} could not be found.
- * <p>
- * This class is thread-safe.
- *
+ * simply returns all input splits of an input vertex in the order they were originally computed.
*/
public class DefaultInputSplitAssigner implements InputSplitAssigner {
- /**
- * The logging object used to report information and errors.
- */
+ /** The logging object used to report information and errors. */
private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);
- /**
- * The split map stores a list of all input splits that still must be consumed by a specific input vertex.
- */
- private final ConcurrentMap<ExecutionGroupVertex, Queue<InputSplit>> splitMap = new ConcurrentHashMap<ExecutionGroupVertex, Queue<InputSplit>>();
-
-
- @Override
- public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
-
- final InputSplit[] inputSplits = groupVertex.getInputSplits();
+ /** The list of all splits */
+ private final List<InputSplit> splits = new ArrayList<InputSplit>();
- if (inputSplits == null) {
- return;
- }
-
- if (inputSplits.length == 0) {
- return;
- }
-
- final Queue<InputSplit> queue = new ConcurrentLinkedQueue<InputSplit>();
- if (this.splitMap.putIfAbsent(groupVertex, queue) != null) {
- LOG.error("Group vertex " + groupVertex.getName() + " already has a split queue");
- }
- queue.addAll(Arrays.asList(inputSplits));
+ public DefaultInputSplitAssigner(InputSplit[] splits) {
+ Collections.addAll(this.splits, splits);
}
-
-
- @Override
- public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
-
- this.splitMap.remove(groupVertex);
+
+ public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
+ this.splits.addAll(splits);
}
-
-
+
+
@Override
- public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
-
- final Queue<InputSplit> queue = this.splitMap.get(vertex.getGroupVertex());
- if (queue == null) {
- final JobID jobID = vertex.getExecutionGraph().getJobID();
- LOG.error("Cannot find split queue for vertex " + vertex.getGroupVertex() + " (job " + jobID + ")");
- return null;
+ public InputSplit getNextInputSplit(String host) {
+ InputSplit next = null;
+
+ // keep the synchronized part short
+ synchronized (this.splits) {
+ if (this.splits.size() > 0) {
+ next = this.splits.remove(this.splits.size() - 1);
+ }
}
-
- InputSplit nextSplit = queue.poll();
-
- if (LOG.isDebugEnabled() && nextSplit != null) {
- LOG.debug("Assigning split " + nextSplit.getSplitNumber() + " to " + vertex);
+
+ if (LOG.isDebugEnabled()) {
+ if (next == null) {
+ LOG.debug("Assigning split " + next + " to " + host);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more input splits available");
+ }
+ }
}
-
- return nextSplit;
+ return next;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java
deleted file mode 100644
index 70e7ef9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-
-/**
- * The input split assigner interface must be implemented by every component which is supposed to dynamically offer
- * input splits to the input vertices of a job at runtime.
- *
- */
-public interface InputSplitAssigner {
-
- /**
- * Registers an input vertex with the input split assigner.
- *
- * @param groupVertex
- * the input vertex to register
- */
- void registerGroupVertex(ExecutionGroupVertex groupVertex);
-
- /**
- * Unregisters an input vertex from the input split assigner. All resources allocated to the input vertex are freed
- * as part of this operation.
- *
- * @param groupVertex
- * the input vertex to unregister
- */
- void unregisterGroupVertex(ExecutionGroupVertex groupVertex);
-
- /**
- * Returns the next input split that shall be consumed by the given input vertex.
- *
- * @param vertex
- * the vertex for which the next input split to be consumed shall be determined
- * @return the next input split to be consumed or <code>null</code> if no more splits shall be consumed by the given
- * vertex
- */
- InputSplit getNextInputSplit(ExecutionVertex vertex);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
deleted file mode 100644
index d9de4ee..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertexIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitAssigner;
-import org.apache.flink.util.StringUtils;
-
-/**
- * The input split manager is responsible for serving input splits to {@link AbstractInputTask} objects at runtime.
- * Before passed on to the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler}, an {@link ExecutionGraph} is registered with the input split
- * manager and all included input vertices of the graph register their generated input splits with the manager. Each
- * type of input split can be assigned to a specific {@link InputSplitAssigner} which is loaded by the input split
- * manager at runtime.
- * <p>
- * This class is thread-safe.
- */
-public final class InputSplitManager {
-
- /**
- * The logging object which is used to report information and errors.
- */
- private static final Logger LOG = LoggerFactory.getLogger(InputSplitManager.class);
-
- /**
- * The prefix of the configuration key which is used to retrieve the class names of the individual
- * {@link InputSplitAssigner} classes
- */
- private static final String INPUT_SPLIT_CONFIG_KEY_PREFIX = "inputsplit.assigner.";
-
- /**
- * A cache which stores the mapping of group vertices to assigner objects for fast retrieval during the job
- * execution.
- */
- private final Map<ExecutionGroupVertex, InputSplitAssigner> assignerCache = new ConcurrentHashMap<ExecutionGroupVertex, InputSplitAssigner>();
-
- /**
- * A map holding an instance of each available {@link InputSplitAssigner}, accessible via the class name of the
- * corresponding split type.
- */
- private final Map<Class<? extends InputSplit>, InputSplitAssigner> loadedAssigners = new HashMap<Class<? extends InputSplit>, InputSplitAssigner>();
-
- /**
- * The input split tracker makes sure that a vertex retrieves the same sequence of input splits after being
- * restarted.
- */
- private final InputSplitTracker inputSplitTracker = new InputSplitTracker();
-
- /**
- * The default input split assigner which is always used if a more specific assigner cannot be found.
- */
- private final InputSplitAssigner defaultAssigner = new DefaultInputSplitAssigner();
-
- /**
- * Registers a new job represented by its {@link ExecutionGraph} with the input split manager.
- *
- * @param executionGraph
- * the job to be registered
- */
- public void registerJob(final ExecutionGraph executionGraph) {
-
- final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(executionGraph, true, -1);
- while (it.hasNext()) {
-
- final ExecutionGroupVertex groupVertex = it.next();
- final InputSplit[] inputSplits = groupVertex.getInputSplits();
-
- if (inputSplits == null) {
- continue;
- }
-
- if (inputSplits.length == 0) {
- continue;
- }
-
- final InputSplitAssigner assigner = getAssignerByType(groupVertex.getInputSplitType(), true);
- // Add entry to cache for fast retrieval during the job execution
- this.assignerCache.put(groupVertex, assigner);
-
- assigner.registerGroupVertex(groupVertex);
- }
-
- // Register job with the input split tracker
- this.inputSplitTracker.registerJob(executionGraph);
- }
-
- /**
- * Unregisters the given job represented by its {@link ExecutionGraph} with the input split manager.
- *
- * @param executionGraph
- * the job to be unregistered
- */
- public void unregisterJob(final ExecutionGraph executionGraph) {
-
- final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(executionGraph, true, -1);
- while (it.hasNext()) {
-
- final ExecutionGroupVertex groupVertex = it.next();
- final InputSplit[] inputSplits = groupVertex.getInputSplits();
-
- if (inputSplits == null) {
- continue;
- }
-
- if (inputSplits.length == 0) {
- continue;
- }
-
- final InputSplitAssigner assigner = this.assignerCache.remove(groupVertex);
- if (assigner == null) {
- LOG.error("Group vertex " + groupVertex.getName()
- + " is unregistered, but cannot be found in assigner cache");
- continue;
- }
-
- assigner.unregisterGroupVertex(groupVertex);
- }
-
- // Unregister job from input split tracker
- this.inputSplitTracker.unregisterJob(executionGraph);
- }
-
- /**
- * Returns the next input split the input split manager (or the responsible {@link InputSplitAssigner} to be more
- * precise) has chosen for the given vertex to consume.
- *
- * @param vertex
- * the vertex for which the next input split is to be determined
- * @param sequenceNumber
- * the sequence number of the vertex's request
- * @return the next input split to consume or <code>null</code> if the vertex shall consume no more input splits
- */
- public InputSplit getNextInputSplit(final ExecutionVertex vertex, final int sequenceNumber) {
-
- InputSplit nextInputSplit = this.inputSplitTracker.getInputSplitFromLog(vertex, sequenceNumber);
- if (nextInputSplit != null) {
- LOG.info("Input split " + nextInputSplit.getSplitNumber() + " for vertex " + vertex + " replayed from log");
- return nextInputSplit;
- }
-
- final ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
- final InputSplitAssigner inputSplitAssigner = this.assignerCache.get(groupVertex);
- if (inputSplitAssigner == null) {
- final JobID jobID = groupVertex.getExecutionStage().getExecutionGraph().getJobID();
- LOG.error("Cannot find input assigner for group vertex " + groupVertex.getName() + " (job " + jobID + ")");
- return null;
- }
-
- nextInputSplit = inputSplitAssigner.getNextInputSplit(vertex);
- if (nextInputSplit != null) {
- this.inputSplitTracker.addInputSplitToLog(vertex, sequenceNumber, nextInputSplit);
- LOG.info(vertex + " receives input split " + nextInputSplit.getSplitNumber());
- }
-
- return nextInputSplit;
- }
-
- /**
- * Returns the {@link InputSplitAssigner} which is defined for the given type of input split.
- *
- * @param inputSplitType
- * the type of input split to find the corresponding {@link InputSplitAssigner} for
- * @param allowLoading
- * <code>true</code> to indicate that the input split assigner is allowed to load additional classes if
- * necessary, <code>false</code> otherwise
- * @return the {@link InputSplitAssigner} responsible for the given type of input split
- */
- private InputSplitAssigner getAssignerByType(final Class<? extends InputSplit> inputSplitType,
- final boolean allowLoading) {
-
- synchronized (this.loadedAssigners) {
-
- InputSplitAssigner assigner = this.loadedAssigners.get(inputSplitType);
- if (assigner == null && allowLoading) {
- assigner = loadInputSplitAssigner(inputSplitType);
- if (assigner != null) {
- this.loadedAssigners.put(inputSplitType, assigner);
- }
- }
-
- if (assigner != null) {
- return assigner;
- }
- }
-
- LOG.warn("Unable to find specific input split provider for type " + inputSplitType.getName()
- + ", using default assigner");
-
- return this.defaultAssigner;
- }
-
- /**
- * Attempts to find the responsible type of {@link InputSplitAssigner} for the given type of input split from the
- * configuration and instantiate an object for it.
- *
- * @param inputSplitType
- * the type of input split to load the {@link InputSplitAssigner} for
- * @return the newly loaded {@link InputSplitAssigner} object or <code>null</code> if no such object could be
- * located or loaded
- */
- private InputSplitAssigner loadInputSplitAssigner(final Class<? extends InputSplit> inputSplitType) {
-
- final String className = inputSplitType.getName();
-
- final String assignerKey = INPUT_SPLIT_CONFIG_KEY_PREFIX + className;
- LOG.info("Trying to load input split assigner for type " + className);
-
- String assignerClassName = GlobalConfiguration.getString(assignerKey, null);
-
- // Provide hard-wired default configuration for FileInputSplit objects to make configuration more robust
- if (assignerClassName == null) {
- if (FileInputSplit.class == inputSplitType) {
- return new FileInputSplitAssigner();
- }
- else if (GenericInputSplit.class == inputSplitType) {
- return new DefaultInputSplitAssigner();
- }
- else {
- return null;
- }
- }
-
- try {
- final Class<? extends InputSplitAssigner> assignerClass =
- Class.forName(assignerClassName).asSubclass(InputSplitAssigner.class);
- return assignerClass.newInstance();
- }
- catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
deleted file mode 100644
index 013fbec..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertexIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-
-/**
- * The input split tracker maintains a log of all the input splits that are handed out to the individual input vertices.
- * In case of an input vertex must be restarted the input split tracker makes sure that the vertex receives the same
- * sequence of input splits as in its original run up to the point that it crashed.
- * <p>
- * This class is thread-safe.
- *
- */
-final class InputSplitTracker {
-
- /**
- * The logging object which is used to report information and errors.
- */
- private static final Logger LOG = LoggerFactory.getLogger(InputSplitTracker.class);
-
- /**
- * The central split map which stores the logs of the individual input vertices.
- */
- private final ConcurrentMap<ExecutionVertexID, List<InputSplit>> splitMap = new ConcurrentHashMap<ExecutionVertexID, List<InputSplit>>();
-
- /**
- * Constructor with package visibility only.
- */
- InputSplitTracker() {
- }
-
- /**
- * Registers a new job with the input split tracker.
- *
- * @param eg
- * the execution graph of the job to be registered
- */
- void registerJob(final ExecutionGraph eg) {
-
- final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(eg, true, -1);
- while (it.hasNext()) {
-
- final ExecutionGroupVertex groupVertex = it.next();
- final InputSplit[] inputSplits = groupVertex.getInputSplits();
-
- if (inputSplits == null) {
- continue;
- }
-
- if (inputSplits.length == 0) {
- continue;
- }
-
- for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
- final ExecutionVertex vertex = groupVertex.getGroupMember(i);
- if (this.splitMap.put(vertex.getID(), new ArrayList<InputSplit>()) != null) {
- LOG.error("InputSplitTracker must keep track of two vertices with ID " + vertex.getID());
- }
- }
- }
- }
-
- /**
- * Unregisters a job from the input split tracker.
- *
- * @param eg
- * the execution graph of the job to be unregistered
- */
- void unregisterJob(final ExecutionGraph eg) {
-
- final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
- while (it.hasNext()) {
- this.splitMap.remove(it.next().getID());
- }
- }
-
- /**
- * Returns the input split with the given sequence number from the specified vertex's log or <code>null</code> if no
- * such input split exists.
- *
- * @param vertex
- * the vertex for which the input split shall be returned from the log
- * @param sequenceNumber
- * the sequence number identifying the log entry
- * @return the input split that was stored under the given sequence number of the vertex's log or <code>null</code>
- * if no such input split exists
- */
- InputSplit getInputSplitFromLog(final ExecutionVertex vertex, final int sequenceNumber) {
-
- final List<InputSplit> inputSplitLog = this.splitMap.get(vertex.getID());
- if (inputSplitLog == null) {
- LOG.error("Cannot find input split log for vertex " + vertex + " (" + vertex.getID() + ")");
- return null;
- }
-
- synchronized (inputSplitLog) {
-
- if (sequenceNumber < inputSplitLog.size()) {
- return inputSplitLog.get(sequenceNumber);
- }
- }
-
- return null;
- }
-
- /**
- * Adds the given input split to the vertex's log and stores it under the specified sequence number.
- *
- * @param vertex
- * the vertex for which the input split shall be stored
- * @param sequenceNumber
- * the sequence number identifying the log entry under which the input split shall be stored
- * @param inputSplit
- * the input split to be stored
- */
- void addInputSplitToLog(final ExecutionVertex vertex, final int sequenceNumber, final InputSplit inputSplit) {
-
- final List<InputSplit> inputSplitLog = this.splitMap.get(vertex.getID());
- if (inputSplitLog == null) {
- LOG.error("Cannot find input split log for vertex " + vertex + " (" + vertex.getID() + ")");
- return;
- }
-
- synchronized (inputSplitLog) {
- if (inputSplitLog.size() != sequenceNumber) {
- LOG.error("Expected input split with sequence number " + inputSplitLog.size() + " for vertex " + vertex
- + " (" + vertex.getID() + ") but received " + sequenceNumber + ", skipping...");
- return;
- }
-
- inputSplitLog.add(inputSplit);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
index 6713714..958fa4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
@@ -28,12 +28,10 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.StringUtils;
/**
* An input split wrapper object wraps an input split for RPC calls. In particular, the input split wrapper ensures that
* the right class loader is used to instantiate the wrapped input split object.
- *
*/
public final class InputSplitWrapper implements IOReadableWritable {
@@ -45,7 +43,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
/**
* The wrapped input split.
*/
- private InputSplit inputSplit = null;
+ private InputSplit inputSplit;
/**
* Constructs a new input split wrapper.
@@ -55,10 +53,9 @@ public final class InputSplitWrapper implements IOReadableWritable {
* @param inputSplit
* the input split to be wrapped
*/
- public InputSplitWrapper(final JobID jobID, final InputSplit inputSplit) {
-
+ public InputSplitWrapper(JobID jobID, InputSplit inputSplit) {
if (jobID == null) {
- throw new IllegalArgumentException("Argument jobID must not be null");
+ throw new NullPointerException();
}
this.jobID = jobID;
@@ -74,7 +71,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
@Override
- public void write(final DataOutputView out) throws IOException {
+ public void write(DataOutputView out) throws IOException {
// Write the job ID
this.jobID.write(out);
@@ -96,7 +93,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
@SuppressWarnings("unchecked")
@Override
- public void read(final DataInputView in) throws IOException {
+ public void read(DataInputView in) throws IOException {
// Read the job ID
this.jobID.read(in);
@@ -117,15 +114,15 @@ public final class InputSplitWrapper implements IOReadableWritable {
try {
splitClass = (Class<? extends InputSplit>) Class.forName(className, true, cl);
} catch (ClassNotFoundException e) {
- throw new IOException(StringUtils.stringifyException(e));
+ throw new IOException(e);
}
try {
this.inputSplit = splitClass.newInstance();
} catch (InstantiationException e) {
- throw new IOException(StringUtils.stringifyException(e));
+ throw new IOException(e);
} catch (IllegalAccessException e) {
- throw new IOException(StringUtils.stringifyException(e));
+ throw new IOException(e);
}
// Read the input split itself
@@ -142,7 +139,6 @@ public final class InputSplitWrapper implements IOReadableWritable {
* @return the wrapped input split, possibly <code>null</code>
*/
public InputSplit getInputSplit() {
-
return this.inputSplit;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 6a45866..08b5b65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -16,105 +16,182 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager.splitassigner;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
/**
- * The locatable input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
- * {@link LocatableInputSplit} objects. The locatable input split assigner offers to take the storage location of the
- * individual locatable input splits into account. It attempts to always assign the splits to vertices in a way that the
- * data locality is preserved as well as possible.
- * <p>
- * This class is thread-safe.
- *
+ * The locatable input split assigner assigns to each host splits that are local, before assigning
+ * splits that are not local.
*/
public final class LocatableInputSplitAssigner implements InputSplitAssigner {
- /**
- * The logging object which is used to report information and errors.
- */
private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
- private final ConcurrentMap<ExecutionGroupVertex, LocatableInputSplitList> vertexMap = new ConcurrentHashMap<ExecutionGroupVertex, LocatableInputSplitList>();
-
- @Override
- public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
+ private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
+
+ private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
+
+ private int localAssignments; // lock protected by the unassigned set lock
+
+ private int remoteAssignments; // lock protected by the unassigned set lock
- if (!LocatableInputSplit.class.isAssignableFrom(groupVertex.getInputSplitType())) {
- LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
- + " and cannot be handled by this split assigner");
- return;
- }
+ // --------------------------------------------------------------------------------------------
+
+ public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
+ this.unassigned.addAll(splits);
+ }
+
+ public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
+ Collections.addAll(this.unassigned, splits);
+ }
+
+ // --------------------------------------------------------------------------------------------
- // Ignore vertices that do not produce splits
- final InputSplit[] inputSplits = groupVertex.getInputSplits();
- if (inputSplits == null) {
- return;
+ @Override
+ public LocatableInputSplit getNextInputSplit(String host) {
+ // for a null host, we return an arbitrary split
+ if (host == null) {
+
+ synchronized (this.unassigned) {
+ Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+ if (iter.hasNext()) {
+ LocatableInputSplit next = iter.next();
+ iter.remove();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning arbitrary split to null host.");
+ }
+
+ remoteAssignments++;
+ return next;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more input splits remaining.");
+ }
+ return null;
+ }
+ }
}
-
- if (inputSplits.length == 0) {
- return;
+
+ host = host.toLowerCase(Locale.US);
+
+ // for any non-null host, we take the list of non-null splits
+ List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
+
+ // if we have no list for this host yet, create one
+ if (localSplits == null) {
+ localSplits = new ArrayList<LocatableInputSplit>(16);
+
+ // lock the list, to be sure that others have to wait for that host's local list
+ synchronized (localSplits) {
+ List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
+
+ // if someone else beat us in the case to create this list, then we do not populate this one, but
+ // simply work with that other list
+ if (prior == null) {
+ // we are the first, we populate
+
+ // first, copy the remaining splits to release the lock on the set early
+ // because that is shared among threads
+ LocatableInputSplit[] remaining;
+ synchronized (this.unassigned) {
+ remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
+ }
+
+ for (LocatableInputSplit is : remaining) {
+ if (isLocal(host, is.getHostnames())) {
+ localSplits.add(is);
+ }
+ }
+ }
+ else {
+ // someone else was faster
+ localSplits = prior;
+ }
+ }
}
-
- final LocatableInputSplitList splitStore = new LocatableInputSplitList();
- if (this.vertexMap.putIfAbsent(groupVertex, splitStore) != null) {
- LOG.error(groupVertex.getName()
- + " appears to be already registered with the locatable input split assigner, ignoring vertex...");
- return;
+
+ // at this point, we have a list of local splits (possibly empty)
+ // we need to make sure no one else operates in the current list (that protects against
+ // list creation races) and that the unassigned set is consistent
+ // NOTE: we need to obtain the locks in this order, strictly!!!
+ synchronized (localSplits) {
+ int size = localSplits.size();
+ if (size > 0) {
+ synchronized (this.unassigned) {
+ do {
+ --size;
+ LocatableInputSplit split = localSplits.remove(size);
+ if (this.unassigned.remove(split)) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning local split to host " + host);
+ }
+
+ localAssignments++;
+ return split;
+ }
+ } while (size > 0);
+ }
+ }
}
-
- synchronized (splitStore) {
-
- for (int i = 0; i < inputSplits.length; ++i) {
- // TODO: Improve this
- final InputSplit inputSplit = inputSplits[i];
- if (!(inputSplit instanceof LocatableInputSplit)) {
- LOG.error("Input split " + i + " of vertex " + groupVertex.getName() + " is of type "
- + inputSplit.getClass() + ", ignoring split...");
- continue;
+
+ // we did not find a local split, return any
+ synchronized (this.unassigned) {
+ Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+ if (iter.hasNext()) {
+ LocatableInputSplit next = iter.next();
+ iter.remove();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning remote split to host " + host);
}
- splitStore.addSplit((LocatableInputSplit) inputSplit);
+
+ remoteAssignments++;
+ return next;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more input splits remaining.");
+ }
+ return null;
}
-
}
}
-
-
- @Override
- public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
- this.vertexMap.remove(groupVertex);
- }
-
-
- @Override
- public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
-
- final ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
- final LocatableInputSplitList splitStore = this.vertexMap.get(groupVertex);
-
- if (splitStore == null) {
- return null;
+
+ private static final boolean isLocal(String host, String[] hosts) {
+ if (host == null || hosts == null) {
+ return false;
}
-
- final Instance instance = vertex.getAllocatedResource().getInstance();
- if (instance == null) {
- LOG.error("Instance is null, returning random split");
- return null;
+
+ for (String h : hosts) {
+ if (h != null && host.equals(h.toLowerCase())) {
+ return true;
+ }
}
-
- return splitStore.getNextInputSplit(instance);
+
+ return false;
+ }
+
+ public int getNumberOfLocalAssignments() {
+ return localAssignments;
+ }
+
+ public int getNumberOfRemoteAssignments() {
+ return remoteAssignments;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
deleted file mode 100644
index 71fbf7a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.instance.Instance;
-
-/**
- * The locatable input split list stores the locatable input splits for an input vertex that are still expected to be
- * consumed. Besides simply storing the splits, the locatable input split list also computes the distance all
- * {@link org.apache.flink.runtime.instance.Instance} objects which request an input split and its nearest storage location with respect to the
- * underlying network topology. That way input splits are always given to consuming vertices in a way that data locality
- * is preserved as well as possible.
- * <p>
- * This class is not thread-safe.
- *
- */
-public final class LocatableInputSplitList {
-
- /**
- * The logging object which is used to report information and errors.
- */
- private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitList.class);
-
- /**
- * The set containing all the locatable input splits that still must be consumed.
- */
- private Set<LocatableInputSplit> masterSet = new HashSet<LocatableInputSplit>();
-
- /**
- * The map caching the specific file input split lists for each {@link org.apache.flink.runtime.instance.Instance}.
- */
- private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
-
- /**
- * This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
- * {@link org.apache.flink.runtime.instance.Instance}.
- *
- */
- private final class QueueElem implements Comparable<QueueElem> {
-
- /**
- * The locatable input split the distance applies to.
- */
- final LocatableInputSplit inputSplit;
-
- /**
- * The minimum distance between the file input split's storage locations and the instance this object has been
- * created for.
- */
- final int distance;
-
- /**
- * Creates a new queue element.
- *
- * @param inputSplit
- * the locatable input split to be stored
- * @param distance
- * the minimum distance between the stored input split's storage locations and the instance this object
- * has been created for
- */
- private QueueElem(final LocatableInputSplit inputSplit, final int distance) {
- this.inputSplit = inputSplit;
- this.distance = distance;
- }
-
- /**
- * Returns the locatable input split stored within this object.
- *
- * @return the locatable input split
- */
- private LocatableInputSplit getInputSplit() {
- return this.inputSplit;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int compareTo(final QueueElem o) {
-
- return (this.distance - o.distance);
- }
-
- }
-
- /**
- * Adds the given locate input split to the set of locatable input splits to be consumed.
- *
- * @param locatableInputSplit
- * the locatable input split to be added
- */
- synchronized void addSplit(final LocatableInputSplit locatableInputSplit) {
-
- this.masterSet.add(locatableInputSplit);
- }
-
- /**
- * Returns the next locatable input split to be consumed by the given instance. The returned input split is selected
- * in a
- * way that the distance between the split's storage location and the requesting {@link org.apache.flink.runtime.instance.Instance} is as
- * short as possible.
- *
- * @param instance
- * the instance requesting the next file input split
- * @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
- * already been consumed.
- */
- synchronized LocatableInputSplit getNextInputSplit(final Instance instance) {
-
- final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
-
- while (true) {
-
- final QueueElem candidate = instanceSplitList.poll();
- if (candidate == null) {
- return null;
- }
-
- if (this.masterSet.remove(candidate.getInputSplit())) {
- if (LOG.isInfoEnabled()) {
- if (candidate.distance == 0) {
- LOG.info(instance + " receives local file input split");
- } else {
- LOG.info(instance + " receives remote file input split (distance " + candidate.distance + ")");
- }
- }
- return candidate.getInputSplit();
- }
-
- if (this.masterSet.isEmpty()) {
- return null;
- }
- }
- }
-
- /**
- * Returns a list of locatable input splits specifically ordered for the given {@link org.apache.flink.runtime.instance.Instance}. When the
- * list is initially created, it contains all the unconsumed located input splits at that point in time, ascendingly
- * ordered
- * by the minimum distance between the input splits' storage locations and the given {@link org.apache.flink.runtime.instance.Instance}.
- *
- * @param instance
- * the instance for which the locatable input split list has been computed
- * @return the list of file input splits ordered specifically for the given instance
- */
- private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
-
- Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
- if (instanceSplitList == null) {
-
- // Create and populate instance specific split list
- instanceSplitList = new PriorityQueue<LocatableInputSplitList.QueueElem>();
- final Iterator<LocatableInputSplit> it = this.masterSet.iterator();
- while (it.hasNext()) {
-
- final LocatableInputSplit split = it.next();
- final String[] hostnames = split.getHostnames();
- if (hostnames == null) {
- instanceSplitList.add(new QueueElem(split, Integer.MAX_VALUE));
-
- } else {
-
- int minDistance = Integer.MAX_VALUE;
- for (int i = 0; i < hostnames.length; ++i) {
- final int distance = instance.getDistance(hostnames[i]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Distance between " + instance + " and " + hostnames[i] + " is " + distance);
- }
- if (distance < minDistance) {
- minDistance = distance;
- }
- }
-
- instanceSplitList.add(new QueueElem(split, minDistance));
- }
- }
-
- this.instanceMap.put(instance, instanceSplitList);
- }
-
- return instanceSplitList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
deleted file mode 100644
index 383ed38..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner.file;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitAssigner;
-
-/**
- * The file input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
- * {@link FileInputSplit} objects. The file input split assigner offers to take the storage location of the individual
- * file input splits into account. It attempts to always assign the splits to vertices in a way that the data locality
- * is preserved as well as possible.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class FileInputSplitAssigner implements InputSplitAssigner {
-
- /**
- * The logging object which is used to report information and errors.
- */
- private static final Logger LOG = LoggerFactory.getLogger(FileInputSplitAssigner.class);
-
- private final ConcurrentMap<ExecutionGroupVertex, FileInputSplitList> vertexMap = new ConcurrentHashMap<ExecutionGroupVertex, FileInputSplitList>();
-
-
- @Override
- public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
-
- if (!FileInputSplit.class.equals(groupVertex.getInputSplitType())) {
- LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
- + " and cannot be handled by this split assigner");
- return;
- }
-
- // Ignore vertices that do not produce splits
- final InputSplit[] inputSplits = groupVertex.getInputSplits();
- if (inputSplits == null) {
- return;
- }
-
- if (inputSplits.length == 0) {
- return;
- }
-
- final FileInputSplitList splitStore = new FileInputSplitList();
- if (this.vertexMap.putIfAbsent(groupVertex, splitStore) != null) {
- LOG.error(groupVertex.getName()
- + " appears to be already registered with the file input split assigner, ignoring vertex...");
- return;
- }
-
- synchronized (splitStore) {
-
- for (int i = 0; i < inputSplits.length; ++i) {
- // TODO: Improve this
- final InputSplit inputSplit = inputSplits[i];
- if (!(inputSplit instanceof FileInputSplit)) {
- LOG.error("Input split " + i + " of vertex " + groupVertex.getName() + " is of type "
- + inputSplit.getClass() + ", ignoring split...");
- continue;
- }
- splitStore.addSplit((FileInputSplit) inputSplit);
- }
-
- }
- }
-
- @Override
- public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
- this.vertexMap.remove(groupVertex);
- }
-
- @Override
- public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
-
- final ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
- final FileInputSplitList splitStore = this.vertexMap.get(groupVertex);
-
- if (splitStore == null) {
- return null;
- }
-
- final Instance instance = vertex.getAllocatedResource().getInstance();
- if (instance == null) {
- LOG.error("Instance is null, returning random split");
- return null;
- }
-
- return splitStore.getNextInputSplit(instance);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
deleted file mode 100644
index 06cca24..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner.file;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.runtime.instance.Instance;
-
-/**
- * The file input split list stores the file input splits for an input vertex that are still expected to be consumed.
- * Besides simply storing the splits, the file input split list also computes the distance all {@link org.apache.flink.runtime.instance.Instance}
- * objects which request a input split and its nearest storage location with respect to the underlying network topology.
- * That way input splits are always given to consuming vertices in a way that data locality is preserved as well as
- * possible.
- * <p>
- * This class is not thread-safe.
- *
- */
-public final class FileInputSplitList {
-
- /**
- * The logging object which is used to report information and errors.
- */
- private static final Logger LOG = LoggerFactory.getLogger(FileInputSplitList.class);
-
- /**
- * The set containing all the file input splits that still must be consumed.
- */
- private Set<FileInputSplit> masterSet = new HashSet<FileInputSplit>();
-
- /**
- * The map caching the specific file input split lists for each {@link org.apache.flink.runtime.instance.Instance}.
- */
- private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
-
- /**
- * This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
- * {@link org.apache.flink.runtime.instance.Instance}.
- *
- */
- private final class QueueElem implements Comparable<QueueElem> {
-
- /**
- * The file input split the distance applies to.
- */
- final FileInputSplit inputSplit;
-
- /**
- * The minimum distance between the file input split's storage locations and the instance this object has been
- * created for.
- */
- final int distance;
-
- /**
- * Creates a new queue element.
- *
- * @param inputSplit
- * the file input split to be stored
- * @param distance
- * the minimum distance between the stored input split's storage locations and the instance this object
- * has been created for
- */
- private QueueElem(final FileInputSplit inputSplit, final int distance) {
- this.inputSplit = inputSplit;
- this.distance = distance;
- }
-
- /**
- * Returns the file input split stored within this object.
- *
- * @return the file input split
- */
- private FileInputSplit getInputSplit() {
- return this.inputSplit;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int compareTo(final QueueElem o) {
-
- return (this.distance - o.distance);
- }
-
- }
-
- /**
- * Adds the given file input split to the set of file input splits to be consumed.
- *
- * @param fileInputSplit
- * the file input split to be added
- */
- synchronized void addSplit(final FileInputSplit fileInputSplit) {
-
- this.masterSet.add(fileInputSplit);
- }
-
- /**
- * Returns the next file input split to be consumed by the given instance. The returned input split is selected in a
- * way that the distance between the split's storage location and the requesting {@link org.apache.flink.runtime.instance.Instance} is as
- * short as possible.
- *
- * @param instance
- * the instance requesting the next file input split
- * @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
- * already been consumed.
- */
- synchronized FileInputSplit getNextInputSplit(final Instance instance) {
-
- final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
-
- while (true) {
-
- final QueueElem candidate = instanceSplitList.poll();
- if (candidate == null) {
- return null;
- }
-
- if (this.masterSet.remove(candidate.getInputSplit())) {
- if (LOG.isInfoEnabled()) {
- if (candidate.distance == 0) {
- LOG.info(instance + " receives local file input split");
- } else {
- LOG.info(instance + " receives remote file input split (distance " + candidate.distance + ")");
- }
- }
- return candidate.getInputSplit();
- }
-
- if (this.masterSet.isEmpty()) {
- return null;
- }
- }
- }
-
- /**
- * Returns a list of file input splits specifically ordered for the given {@link org.apache.flink.runtime.instance.Instance}. When the list is
- * initially created, it contains all the unconsumed file input splits at that point in time, ascendingly ordered by
- * the minimum distance between the input splits' storage locations and the given {@link org.apache.flink.runtime.instance.Instance}.
- *
- * @param instance
- * the instance for which the file input split list has been computed
- * @return the list of file input splits ordered specifically for the given instance
- */
- private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
-
- Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
- if (instanceSplitList == null) {
-
- // Create and populate instance specific split list
- instanceSplitList = new PriorityQueue<FileInputSplitList.QueueElem>();
- final Iterator<FileInputSplit> it = this.masterSet.iterator();
- while (it.hasNext()) {
-
- final FileInputSplit split = it.next();
- final String[] hostNames = split.getHostNames();
- if (hostNames == null) {
- instanceSplitList.add(new QueueElem(split, Integer.MAX_VALUE));
-
- } else {
-
- int minDistance = Integer.MAX_VALUE;
- for (int i = 0; i < hostNames.length; ++i) {
- final int distance = instance.getDistance(hostNames[i]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Distance between " + instance + " and " + hostNames[i] + " is " + distance);
- }
- if (distance < minDistance) {
- minDistance = distance;
- }
- }
-
- instanceSplitList.add(new QueueElem(split, minDistance));
- }
- }
-
- this.instanceMap.put(instance, instanceSplitList);
- }
-
- return instanceSplitList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
index 82e7213..0bec1f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
@@ -28,12 +28,8 @@ import static org.junit.Assert.fail;
import java.net.InetAddress;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.LogUtils;
-
-import org.apache.log4j.Level;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 9380e26..290326c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -57,10 +57,9 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.types.IntegerRecord;
-import org.apache.flink.util.LogUtils;
+
import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -71,12 +70,6 @@ public class LocalInstanceManagerTest {
private int port;
-
- @BeforeClass
- public static void initLogger() {
- LogUtils.initializeDefaultTestConsoleLogger();
- }
-
@Before
public void startJobManagerServer() {
try {