You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:40 UTC
[16/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java
deleted file mode 100644
index 2f89a80..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.io;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.ReflectionUtil;
-
-/**
- * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
- */
-public class FormatUtil {
-
-
- /**
- * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
- * initializes the format.
- *
- * @param <T>
- * the class of the InputFormat
- * @param inputFormatClass
- * the class of the InputFormat
- * @param path
- * the path of the file
- * @param configuration
- * optional configuration of the InputFormat
- * @return the created {@link InputFormat}
- * @throws IOException
- * if an I/O error occurred while accessing the file or initializing the InputFormat.
- */
- public static <T, F extends FileInputFormat<T>> F openInput(
- Class<F> inputFormatClass, String path, Configuration configuration)
- throws IOException
- {
- configuration = configuration == null ? new Configuration() : configuration;
-
- Path normalizedPath = normalizePath(new Path(path));
- final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-
- inputFormat.setFilePath(normalizedPath);
- inputFormat.setOpenTimeout(0);
- inputFormat.configure(configuration);
-
- final FileSystem fs = FileSystem.get(normalizedPath.toUri());
- FileStatus fileStatus = fs.getFileStatus(normalizedPath);
-
- BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
- inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
- return inputFormat;
- }
-
- /**
- * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
- * initializes the formats.
- *
- * @param <T>
- * the class of the InputFormat
- * @param inputFormatClass
- * the class of the InputFormat
- * @param path
- * the path of the file or to the directory containing the splits
- * @param configuration
- * optional configuration of the InputFormat
- * @return the created {@link InputFormat}s for each file in the specified path
- * @throws IOException
- * if an I/O error occurred while accessing the files or initializing the InputFormat.
- */
- @SuppressWarnings("unchecked")
- public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
- Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
- Path nephelePath = new Path(path);
- FileSystem fs = nephelePath.getFileSystem();
- FileStatus fileStatus = fs.getFileStatus(nephelePath);
- if (!fileStatus.isDir()) {
- return Arrays.asList(openInput(inputFormatClass, path, configuration));
- }
- FileStatus[] list = fs.listStatus(nephelePath);
- List<F> formats = new ArrayList<F>();
- for (int index = 0; index < list.length; index++) {
- formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
- }
- return formats;
- }
-
- /**
- * Creates an {@link InputFormat} from a given class. The optional {@link Configuration}
- * initializes the format.
- *
- * @param <T>
- * the class of the InputFormat
- * @param inputFormatClass
- * the class of the InputFormat
- * @param configuration
- * optional configuration of the InputFormat
- * @return the created {@link InputFormat}
- * @throws IOException
- * if an I/O error occurred while accessing the file or initializing the InputFormat.
- */
- public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput(
- Class<F> inputFormatClass, Configuration configuration) throws IOException {
- configuration = configuration == null ? new Configuration() : configuration;
-
- final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
- inputFormat.configure(configuration);
- final IS[] splits = inputFormat.createInputSplits(1);
- inputFormat.open(splits[0]);
- return inputFormat;
- }
-
- /**
- * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration}
- * initializes the format.
- *
- * @param <T>
- * the class of the OutputFormat
- * @param outputFormatClass
- * the class of the OutputFormat
- * @param path
- * the path of the file or to the directory containing the splits
- * @param configuration
- * optional configuration of the OutputFormat
- * @return the created {@link OutputFormat}
- * @throws IOException
- * if an I/O error occurred while accessing the file or initializing the OutputFormat.
- */
- public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
- Class<F> outputFormatClass, String path, Configuration configuration)
- throws IOException
- {
- final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
- outputFormat.setOutputFilePath(new Path(path));
- outputFormat.setWriteMode(WriteMode.OVERWRITE);
-
- configuration = configuration == null ? new Configuration() : configuration;
-
- outputFormat.configure(configuration);
- outputFormat.open(0, 1);
- return outputFormat;
- }
-
- /**
- * Fixes the path if it denotes a local (relative) file without the proper protocol prefix.
- */
- private static Path normalizePath(Path path) {
- URI uri = path.toUri();
- if (uri.getScheme() == null) {
- try {
- uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment());
- path = new Path(uri.toString());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("path is invalid", e);
- }
- }
- return path;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
index 0ddeb64..c108471 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
@@ -50,7 +50,6 @@ public abstract class GenericInputFormat<OT> implements InputFormat<OT, GenericI
return cachedStatistics;
}
-
@Override
public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
if (numSplits < 1) {
@@ -66,8 +65,8 @@ public abstract class GenericInputFormat<OT> implements InputFormat<OT, GenericI
}
@Override
- public Class<? extends GenericInputSplit> getInputSplitType() {
- return GenericInputSplit.class;
+ public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
+ return new DefaultInputSplitAssigner(splits);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
index 1bb7815..5eaa657 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.io;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
index cb4019c..6845237 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.io;
import java.io.IOException;
@@ -25,6 +24,8 @@ import java.io.Serializable;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
/**
* The base interface for data sources that produces records.
@@ -59,7 +60,7 @@ import org.apache.flink.core.io.InputSplit;
* @param <OT> The type of the produced records.
* @param <T> The type of input split.
*/
-public interface InputFormat<OT, T extends InputSplit> extends Serializable {
+public interface InputFormat<OT, T extends InputSplit> extends InputSplitSource<T>, Serializable {
/**
* Configures this input format. Since input formats are instantiated generically and hence parameterless,
@@ -95,6 +96,7 @@ public interface InputFormat<OT, T extends InputSplit> extends Serializable {
*
* @throws IOException Thrown, when the creation of the splits was erroneous.
*/
+ @Override
T[] createInputSplits(int minNumSplits) throws IOException;
/**
@@ -102,7 +104,8 @@ public interface InputFormat<OT, T extends InputSplit> extends Serializable {
*
* @return The type of the input splits.
*/
- Class<? extends T> getInputSplitType();
+ @Override
+ InputSplitAssigner getInputSplitAssigner(T[] inputSplits);
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
new file mode 100644
index 0000000..6243681
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.LocatableInputSplit;
+
+/**
+ * The locatable input split assigner assigns to each host splits that are local, before assigning
+ * splits that are not local.
+ */
+public final class LocatableInputSplitAssigner implements InputSplitAssigner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
+
+
+ private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
+
+ private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
+
+ private int localAssignments; // lock protected by the unassigned set lock
+
+ private int remoteAssignments; // lock protected by the unassigned set lock
+
+ // --------------------------------------------------------------------------------------------
+
+ public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
+ this.unassigned.addAll(splits);
+ }
+
+ public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
+ Collections.addAll(this.unassigned, splits);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public LocatableInputSplit getNextInputSplit(String host) {
+ // for a null host, we return an arbitrary split
+ if (host == null) {
+
+ synchronized (this.unassigned) {
+ Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+ if (iter.hasNext()) {
+ LocatableInputSplit next = iter.next();
+ iter.remove();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning arbitrary split to null host.");
+ }
+
+ remoteAssignments++;
+ return next;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more input splits remaining.");
+ }
+ return null;
+ }
+ }
+ }
+
+ host = host.toLowerCase(Locale.US);
+
+ // for any non-null host, we take the list of non-null splits
+ List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
+
+ // if we have no list for this host yet, create one
+ if (localSplits == null) {
+ localSplits = new ArrayList<LocatableInputSplit>(16);
+
+ // lock the list, to be sure that others have to wait for that host's local list
+ synchronized (localSplits) {
+ List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
+
+ // if someone else beat us in the case to create this list, then we do not populate this one, but
+ // simply work with that other list
+ if (prior == null) {
+ // we are the first, we populate
+
+ // first, copy the remaining splits to release the lock on the set early
+ // because that is shared among threads
+ LocatableInputSplit[] remaining;
+ synchronized (this.unassigned) {
+ remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
+ }
+
+ for (LocatableInputSplit is : remaining) {
+ if (isLocal(host, is.getHostnames())) {
+ localSplits.add(is);
+ }
+ }
+ }
+ else {
+ // someone else was faster
+ localSplits = prior;
+ }
+ }
+ }
+
+ // at this point, we have a list of local splits (possibly empty)
+ // we need to make sure no one else operates in the current list (that protects against
+ // list creation races) and that the unassigned set is consistent
+ // NOTE: we need to obtain the locks in this order, strictly!!!
+ synchronized (localSplits) {
+ int size = localSplits.size();
+ if (size > 0) {
+ synchronized (this.unassigned) {
+ do {
+ --size;
+ LocatableInputSplit split = localSplits.remove(size);
+ if (this.unassigned.remove(split)) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning local split to host " + host);
+ }
+
+ localAssignments++;
+ return split;
+ }
+ } while (size > 0);
+ }
+ }
+ }
+
+ // we did not find a local split, return any
+ synchronized (this.unassigned) {
+ Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+ if (iter.hasNext()) {
+ LocatableInputSplit next = iter.next();
+ iter.remove();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning remote split to host " + host);
+ }
+
+ remoteAssignments++;
+ return next;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more input splits remaining.");
+ }
+ return null;
+ }
+ }
+ }
+
+ private static final boolean isLocal(String host, String[] hosts) {
+ if (host == null || hosts == null) {
+ return false;
+ }
+
+ for (String h : hosts) {
+ if (h != null && host.equals(h.toLowerCase())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public int getNumberOfLocalAssignments() {
+ return localAssignments;
+ }
+
+ public int getNumberOfRemoteAssignments() {
+ return remoteAssignments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java b/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
index 157df71..d4fce5b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
@@ -18,10 +18,9 @@
package org.apache.flink.api.common.io;
-
/**
* This interface acts as a marker for input formats for inputs which cannot be split.
- * Data sources with a Sequential input formats are always executed with a degree-of-parallelism
+ * Data sources with a non-parallel input formats are always executed with a degree-of-parallelism
* of one.
*
* @see InputFormat
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
index cd02ac9..ddf9cbc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.io;
import java.io.IOException;
@@ -24,7 +23,6 @@ import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
-
/**
* The base interface for outputs that consumes records. The output format
* describes how to store the final records, for example in a file.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index c4a69ba..ebee5d0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -151,19 +151,12 @@ public class FileInputSplit extends LocatableInputSplit {
if (obj == this) {
return true;
}
- else if (obj != null && super.equals(obj) && obj instanceof FileInputSplit) {
+ else if (obj != null && obj instanceof FileInputSplit && super.equals(obj)) {
FileInputSplit other = (FileInputSplit) obj;
- if (this.file != null) {
- if (!this.file.equals(other.file)) {
- return false;
- }
- }
- else if (other.file != null) {
- return false;
- }
-
- return this.start == other.start && this.length == other.length;
+ return this.start == other.start &&
+ this.length == other.length &&
+ (this.file == null ? other.file == null : (other.file != null && this.file.equals(other.file)));
}
else {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
index 850ba1c..52018a1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
@@ -78,7 +78,7 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable {
}
@Override
- public void read(final DataInputView in) throws IOException {
+ public void read(DataInputView in) throws IOException {
this.partitionNumber = in.readInt();
this.totalNumberOfPartitions = in.readInt();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index c054fb8..1a51207 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.core.io;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
new file mode 100644
index 0000000..256b9c7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * InputSplitSources create {@link InputSplit}s that define portions of data to be produced
+ * by {@link org.apache.flink.api.common.io.InputFormat}s.
+ *
+ * @param <T> The type of the input splits created by the source.
+ */
+public interface InputSplitSource<T extends InputSplit> extends java.io.Serializable {
+
+ /**
+ * Computes the input splits. The given minimum number of splits is a hint as to how
+ * many splits are desired.
+ *
+ * @param minNumSplits Number of minimal input splits, as a hint.
+ * @return An array of input splits.
+ *
+ * @throws Exception Exceptions when creating the input splits may be forwarded and will cause the
+ * execution to permanently fail.
+ */
+ T[] createInputSplits(int minNumSplits) throws Exception;
+
+ /**
+ * Returns the assigner for the input splits. Assigner determines which parallel instance of the
+ * input format gets which input split.
+ *
+ * @return The input split assigner.
+ */
+ InputSplitAssigner getInputSplitAssigner(T[] inputSplits);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
new file mode 100644
index 0000000..25835f5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public final class DataInputViewStream extends InputStream {
+
+ private final DataInputView inputView;
+
+
+ public DataInputViewStream(DataInputView inputView) {
+ this.inputView = inputView;
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ return inputView.readByte();
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ inputView.readFully(b, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java
new file mode 100644
index 0000000..f3188aa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public final class DataOutputViewStream extends OutputStream {
+
+ private final DataOutputView outputView;
+
+
+ public DataOutputViewStream(DataOutputView outputView) {
+ this.outputView = outputView;
+ }
+
+
+ @Override
+ public void write(int b) throws IOException {
+ outputView.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputView.write(b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
index 00ad8e5..7e891bb 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.util;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
new file mode 100644
index 0000000..8f9822e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class ExceptionUtils {
+
+ /**
+ * Makes a string representation of the exception's stack trace.
+ *
+ * @param e The exception to stringify.
+ * @return A string with exception name and call stack.
+ */
+ public static String stringifyException(final Throwable e) {
+ final StringWriter stm = new StringWriter();
+ final PrintWriter wrt = new PrintWriter(stm);
+ e.printStackTrace(wrt);
+ wrt.close();
+ return stm.toString();
+ }
+
+ /**
+ * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
+ * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions
+ * are packed into runtime exceptions
+ *
+ * @param t The throwable to be thrown.
+ */
+ public static void rethrow(Throwable t) {
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ else {
+ throw new RuntimeException(t);
+ }
+ }
+
+ /**
+ * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
+ * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions
+ * are packed into a parent RuntimeEception.
+ *
+ * @param t The throwable to be thrown.
+ * @param parentMessage The message for the parent RuntimeException, if one is needed.
+ */
+ public static void rethrow(Throwable t, String parentMessage) {
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ else {
+ throw new RuntimeException(parentMessage, t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java b/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
index 2bdbc60..755e188 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
@@ -21,13 +21,10 @@ package org.apache.flink.util;
import java.util.Iterator;
import java.util.NoSuchElementException;
-
-
-/**
- *
- */
/**
- *
+ * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s.
+ * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple
+ * iterators that each return a subsequence of the number sequence.
*/
public class NumberSequenceIterator implements SplittableIterator<Long> {
@@ -38,7 +35,12 @@ public class NumberSequenceIterator implements SplittableIterator<Long> {
private long current;
-
+ /**
+ * Internal constructor to allow for empty iterators.
+ *
+ * @param from The first number returned by the iterator.
+ * @param to The last number returned by the iterator.
+ */
public NumberSequenceIterator(long from, long to) {
if (from > to) {
throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
@@ -52,11 +54,11 @@ public class NumberSequenceIterator implements SplittableIterator<Long> {
/**
* Internal constructor to allow for empty iterators.
*
- * @param from
- * @param to
- * @param mark
+ * @param from The first number returned by the iterator.
+ * @param to The last number returned by the iterator.
+ * @param unused A dummy parameter to disambiguate the constructor.
*/
- private NumberSequenceIterator(long from, long to, boolean mark) {
+ private NumberSequenceIterator(long from, long to, boolean unused) {
this.current = from;
this.to = to;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java b/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
index c3e6362..cfbd14e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
@@ -16,14 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.util;
import java.io.Serializable;
import org.apache.flink.types.StringValue;
-
/**
* Utility class for efficient string operations on strings. All methods in this class are
* written to be optimized for efficiency and work only on strings whose characters are
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
index a15c31c..7d50d12 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
@@ -18,10 +18,17 @@
package org.apache.flink.util;
+/**
+ * An exception, indicating that an {@link java.lang.Iterable} can only be traversed once, but has been attempted
+ * to traverse an additional time.
+ */
public class TraversableOnceException extends RuntimeException {
private static final long serialVersionUID = 7636881584773577290L;
+ /**
+ * Creates a new exception with a default message.
+ */
public TraversableOnceException() {
super("The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
new file mode 100644
index 0000000..2f89a80
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.io;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.util.ReflectionUtil;
+
+/**
+ * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
+ */
+public class FormatUtil {
+
+
+ /**
+ * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
+ * initializes the format.
+ *
+ * @param <T>
+ * the class of the InputFormat
+ * @param inputFormatClass
+ * the class of the InputFormat
+ * @param path
+ * the path of the file
+ * @param configuration
+ * optional configuration of the InputFormat
+ * @return the created {@link InputFormat}
+ * @throws IOException
+ * if an I/O error occurred while accessing the file or initializing the InputFormat.
+ */
+ public static <T, F extends FileInputFormat<T>> F openInput(
+ Class<F> inputFormatClass, String path, Configuration configuration)
+ throws IOException
+ {
+ configuration = configuration == null ? new Configuration() : configuration;
+
+ Path normalizedPath = normalizePath(new Path(path));
+ final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
+
+ inputFormat.setFilePath(normalizedPath);
+ inputFormat.setOpenTimeout(0);
+ inputFormat.configure(configuration);
+
+ final FileSystem fs = FileSystem.get(normalizedPath.toUri());
+ FileStatus fileStatus = fs.getFileStatus(normalizedPath);
+
+ BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
+ return inputFormat;
+ }
+
+ /**
+ * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
+ * initializes the formats.
+ *
+ * @param <T>
+ * the class of the InputFormat
+ * @param inputFormatClass
+ * the class of the InputFormat
+ * @param path
+ * the path of the file or to the directory containing the splits
+ * @param configuration
+ * optional configuration of the InputFormat
+ * @return the created {@link InputFormat}s for each file in the specified path
+ * @throws IOException
+ * if an I/O error occurred while accessing the files or initializing the InputFormat.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
+ Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
+ Path nephelePath = new Path(path);
+ FileSystem fs = nephelePath.getFileSystem();
+ FileStatus fileStatus = fs.getFileStatus(nephelePath);
+ if (!fileStatus.isDir()) {
+ return Arrays.asList(openInput(inputFormatClass, path, configuration));
+ }
+ FileStatus[] list = fs.listStatus(nephelePath);
+ List<F> formats = new ArrayList<F>();
+ for (int index = 0; index < list.length; index++) {
+ formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
+ }
+ return formats;
+ }
+
+ /**
+ * Creates an {@link InputFormat} from a given class. The optional {@link Configuration}
+ * initializes the format.
+ *
+ * @param <T>
+ * the class of the InputFormat
+ * @param inputFormatClass
+ * the class of the InputFormat
+ * @param configuration
+ * optional configuration of the InputFormat
+ * @return the created {@link InputFormat}
+ * @throws IOException
+ * if an I/O error occurred while accessing the file or initializing the InputFormat.
+ */
+ public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput(
+ Class<F> inputFormatClass, Configuration configuration) throws IOException {
+ configuration = configuration == null ? new Configuration() : configuration;
+
+ final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
+ inputFormat.configure(configuration);
+ final IS[] splits = inputFormat.createInputSplits(1);
+ inputFormat.open(splits[0]);
+ return inputFormat;
+ }
+
+ /**
+ * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration}
+ * initializes the format.
+ *
+ * @param <T>
+ * the class of the OutputFormat
+ * @param outputFormatClass
+ * the class of the OutputFormat
+ * @param path
+ * the path of the file or to the directory containing the splits
+ * @param configuration
+ * optional configuration of the OutputFormat
+ * @return the created {@link OutputFormat}
+ * @throws IOException
+ * if an I/O error occurred while accessing the file or initializing the OutputFormat.
+ */
+ public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
+ Class<F> outputFormatClass, String path, Configuration configuration)
+ throws IOException
+ {
+ final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
+ outputFormat.setOutputFilePath(new Path(path));
+ outputFormat.setWriteMode(WriteMode.OVERWRITE);
+
+ configuration = configuration == null ? new Configuration() : configuration;
+
+ outputFormat.configure(configuration);
+ outputFormat.open(0, 1);
+ return outputFormat;
+ }
+
+ /**
+ * Fixes the path if it denotes a local (relative) file without the proper protocol prefix.
+ */
+ private static Path normalizePath(Path path) {
+ URI uri = path.toUri();
+ if (uri.getScheme() == null) {
+ try {
+ uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment());
+ path = new Path(uri.toString());
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("path is invalid", e);
+ }
+ }
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
new file mode 100644
index 0000000..b88fcc5
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import static org.junit.Assert.*;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.junit.Test;
+
+
+public class DefaultSplitAssignerTest {
+
+ @Test
+ public void testSerialSplitAssignment() {
+ try {
+ final int NUM_SPLITS = 50;
+
+ Set<InputSplit> splits = new HashSet<InputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new GenericInputSplit(i, NUM_SPLITS));
+ }
+
+ DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+ InputSplit is = null;
+ while ((is = ia.getNextInputSplit("")) != null) {
+ assertTrue(splits.remove(is));
+ }
+
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit(""));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConcurrentSplitAssignment() {
+ try {
+ final int NUM_THREADS = 10;
+ final int NUM_SPLITS = 500;
+ final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+
+ Set<InputSplit> splits = new HashSet<InputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new GenericInputSplit(i, NUM_SPLITS));
+ }
+
+ final DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+
+ final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+ final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+ Runnable retriever = new Runnable() {
+
+ @Override
+ public void run() {
+ String host = "";
+ GenericInputSplit split;
+ while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) {
+ splitsRetrieved.incrementAndGet();
+ sumOfIds.addAndGet(split.getSplitNumber());
+ }
+ }
+ };
+
+ // create the threads
+ Thread[] threads = new Thread[NUM_THREADS];
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new Thread(retriever);
+ threads[i].setDaemon(true);
+ }
+
+ // launch concurrently
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].start();
+ }
+
+ // sync
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].join(5000);
+ }
+
+ // verify
+ for (int i = 0; i < NUM_THREADS; i++) {
+ if (threads[i].isAlive()) {
+ fail("The concurrency test case is erroneous, the thread did not respond in time.");
+ }
+ }
+
+ assertEquals(NUM_SPLITS, splitsRetrieved.get());
+ assertEquals(SUM_OF_IDS, sumOfIds.get());
+
+ // nothing left
+ assertNull(ia.getNextInputSplit(""));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
new file mode 100644
index 0000000..aa56c1c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import static org.junit.Assert.*;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.util.LogUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class LocatableSplitAssignerTest {
+
+ @Test
+ public void testSerialSplitAssignmentWithNullHost() {
+ try {
+ final int NUM_SPLITS = 50;
+ final String[][] hosts = new String[][] {
+ new String[] { "localhost" },
+ new String[0],
+ null
+ };
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, hosts[i%3]));
+ }
+
+ // get all available splits
+ LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+ InputSplit is = null;
+ while ((is = ia.getNextInputSplit(null)) != null) {
+ assertTrue(splits.remove(is));
+ }
+
+ // check we had all
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit(""));
+ assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+ assertEquals(0, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialSplitAssignmentAllForSameHost() {
+ try {
+ final int NUM_SPLITS = 50;
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, "testhost"));
+ }
+
+ // get all available splits
+ LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+ InputSplit is = null;
+ while ((is = ia.getNextInputSplit("testhost")) != null) {
+ assertTrue(splits.remove(is));
+ }
+
+ // check we had all
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit(""));
+
+ assertEquals(0, ia.getNumberOfRemoteAssignments());
+ assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialSplitAssignmentAllForRemoteHost() {
+ try {
+ final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+ final int NUM_SPLITS = 10 * hosts.length;
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+ }
+
+ // get all available splits
+ LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+ InputSplit is = null;
+ while ((is = ia.getNextInputSplit("testhost")) != null) {
+ assertTrue(splits.remove(is));
+ }
+
+ // check we had all
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit("anotherHost"));
+
+ assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+ assertEquals(0, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialSplitAssignmentMixedLocalHost() {
+ try {
+ final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+ final int NUM_SPLITS = 10 * hosts.length;
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+ }
+
+ // get all available splits
+ LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+ InputSplit is = null;
+ int i = 0;
+ while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
+ assertTrue(splits.remove(is));
+ }
+
+ // check we had all
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit("anotherHost"));
+
+ assertEquals(0, ia.getNumberOfRemoteAssignments());
+ assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConcurrentSplitAssignmentNullHost() {
+ try {
+ final int NUM_THREADS = 10;
+ final int NUM_SPLITS = 500;
+ final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+
+ final String[][] hosts = new String[][] {
+ new String[] { "localhost" },
+ new String[0],
+ null
+ };
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, hosts[i%3]));
+ }
+
+ final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+
+ final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+ final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+ Runnable retriever = new Runnable() {
+
+ @Override
+ public void run() {
+ LocatableInputSplit split;
+ while ((split = ia.getNextInputSplit(null)) != null) {
+ splitsRetrieved.incrementAndGet();
+ sumOfIds.addAndGet(split.getSplitNumber());
+ }
+ }
+ };
+
+ // create the threads
+ Thread[] threads = new Thread[NUM_THREADS];
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new Thread(retriever);
+ threads[i].setDaemon(true);
+ }
+
+ // launch concurrently
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].start();
+ }
+
+ // sync
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].join(5000);
+ }
+
+ // verify
+ for (int i = 0; i < NUM_THREADS; i++) {
+ if (threads[i].isAlive()) {
+ fail("The concurrency test case is erroneous, the thread did not respond in time.");
+ }
+ }
+
+ assertEquals(NUM_SPLITS, splitsRetrieved.get());
+ assertEquals(SUM_OF_IDS, sumOfIds.get());
+
+ // nothing left
+ assertNull(ia.getNextInputSplit(""));
+
+ assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+ assertEquals(0, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConcurrentSplitAssignmentForSingleHost() {
+ try {
+ final int NUM_THREADS = 10;
+ final int NUM_SPLITS = 500;
+ final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, "testhost"));
+ }
+
+ final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+
+ final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+ final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+ Runnable retriever = new Runnable() {
+
+ @Override
+ public void run() {
+ LocatableInputSplit split;
+ while ((split = ia.getNextInputSplit("testhost")) != null) {
+ splitsRetrieved.incrementAndGet();
+ sumOfIds.addAndGet(split.getSplitNumber());
+ }
+ }
+ };
+
+ // create the threads
+ Thread[] threads = new Thread[NUM_THREADS];
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new Thread(retriever);
+ threads[i].setDaemon(true);
+ }
+
+ // launch concurrently
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].start();
+ }
+
+ // sync
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].join(5000);
+ }
+
+ // verify
+ for (int i = 0; i < NUM_THREADS; i++) {
+ if (threads[i].isAlive()) {
+ fail("The concurrency test case is erroneous, the thread did not respond in time.");
+ }
+ }
+
+ assertEquals(NUM_SPLITS, splitsRetrieved.get());
+ assertEquals(SUM_OF_IDS, sumOfIds.get());
+
+ // nothing left
+ assertNull(ia.getNextInputSplit("testhost"));
+
+ assertEquals(0, ia.getNumberOfRemoteAssignments());
+ assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConcurrentSplitAssignmentForMultipleHosts() {
+ try {
+ final int NUM_THREADS = 10;
+ final int NUM_SPLITS = 500;
+ final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+
+ final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+
+ // load some splits
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(i, hosts[i%hosts.length]));
+ }
+
+ final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+
+ final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+ final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+ Runnable retriever = new Runnable() {
+
+ @Override
+ public void run() {
+ final String threadHost = hosts[(int) (Math.random() * hosts.length)];
+
+ LocatableInputSplit split;
+ while ((split = ia.getNextInputSplit(threadHost)) != null) {
+ splitsRetrieved.incrementAndGet();
+ sumOfIds.addAndGet(split.getSplitNumber());
+ }
+ }
+ };
+
+ // create the threads
+ Thread[] threads = new Thread[NUM_THREADS];
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new Thread(retriever);
+ threads[i].setDaemon(true);
+ }
+
+ // launch concurrently
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].start();
+ }
+
+ // sync
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].join(5000);
+ }
+
+ // verify
+ for (int i = 0; i < NUM_THREADS; i++) {
+ if (threads[i].isAlive()) {
+ fail("The concurrency test case is erroneous, the thread did not respond in time.");
+ }
+ }
+
+ assertEquals(NUM_SPLITS, splitsRetrieved.get());
+ assertEquals(SUM_OF_IDS, sumOfIds.get());
+
+ // nothing left
+ assertNull(ia.getNextInputSplit("testhost"));
+
+ // at least one fraction of hosts needs be local, no matter how bad the thread races
+ assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
index 3518927..a5d2b91 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.api.java.record.io;
import java.io.IOException;
import org.junit.Assert;
-
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat;
import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
@@ -286,8 +286,8 @@ private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> format;
}
@Override
- public Class<GenericInputSplit> getInputSplitType() {
- return GenericInputSplit.class;
+ public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
+ return new DefaultInputSplitAssigner(splits);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
index 1b86ecb..96f6664 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
@@ -16,13 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.io;
import java.io.IOException;
import org.junit.Assert;
-
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
@@ -225,10 +224,10 @@ public class ExternalProcessInputFormatTest {
}
@Override
- public Class<GenericInputSplit> getInputSplitType() {
- return GenericInputSplit.class;
+ public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
+ return new DefaultInputSplitAssigner(splits);
}
-
+
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
index 2ff49e0..7ec5a6a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -141,7 +142,7 @@ public class TypeExtractorInputFormatsTest {
public InputSplit[] createInputSplits(int minNumSplits) { return null; }
@Override
- public Class<? extends InputSplit> getInputSplitType() { return null; }
+ public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; }
@Override
public void open(InputSplit split) {}
@@ -211,7 +212,7 @@ public class TypeExtractorInputFormatsTest {
public InputSplit[] createInputSplits(int minNumSplits) { return null; }
@Override
- public Class<? extends InputSplit> getInputSplitType() { return null; }
+ public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; }
@Override
public void open(InputSplit split) {}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
index c187961..1fa9491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
@@ -41,6 +41,7 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
/** The size of the ID in byte */
public static final int SIZE = 2 * SIZE_OF_LONG;
+
/** The upper part of the actual ID */
private long upperPart;
@@ -48,11 +49,13 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
/** The lower part of the actual ID */
private long lowerPart;
+ // --------------------------------------------------------------------------------------------
+
/**
* Constructs a new ID with a specific bytes value.
*/
public AbstractID(byte[] bytes) {
- if (bytes.length != SIZE) {
+ if (bytes == null || bytes.length != SIZE) {
throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
}
@@ -80,6 +83,9 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
* @param id the abstract ID to copy
*/
public AbstractID(AbstractID id) {
+ if (id == null) {
+ throw new IllegalArgumentException("Id must not be null.");
+ }
this.lowerPart = id.lowerPart;
this.upperPart = id.upperPart;
}
@@ -91,7 +97,19 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
this.lowerPart = generateRandomLong();
this.upperPart = generateRandomLong();
}
+
+ // --------------------------------------------------------------------------------------------
+
+ public long getLowerPart() {
+ return lowerPart;
+ }
+
+ public long getUpperPart() {
+ return upperPart;
+ }
+ // --------------------------------------------------------------------------------------------
+
/**
* Generates a uniformly distributed random positive long.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java
new file mode 100644
index 0000000..c4a4211
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+public class JobException extends Exception {
+
+ private static final long serialVersionUID = 1275864691743020176L;
+
+ public JobException(String msg) {
+ super(msg);
+ }
+
+ public JobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java
deleted file mode 100644
index 4960d80..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime;
-
-/**
- * A job submission exception is thrown if an error occurs while submitting
- * a job from the client to the job manager.
- *
- */
-public class JobSubmissionException extends Exception {
-
- /**
- * Generated serial UID
- */
- private static final long serialVersionUID = 1275864691743020176L;
-
- /**
- * Constructs a new job submission exception with the given error message.
- *
- * @param msg
- * the error message to be transported through this exception
- */
- public JobSubmissionException(String msg) {
- super(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
index 2ca8b68..416453f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
@@ -37,18 +37,13 @@ public abstract class AbstractJobResult implements IOReadableWritable {
/**
* The possible return codes for a job operation.
- *
*/
public enum ReturnCode {
- /**
- * The success return code.
- */
+ /** The success return code. */
SUCCESS,
- /**
- * The error return code.
- */
+ /** The error return code. */
ERROR
};
@@ -60,7 +55,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
/**
* An optional description which can provide further information in case of an error.
*/
- private String description = null;
+ private String description = "";
/**
* Constructs a new abstract job result object and sets the description.
@@ -70,7 +65,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
* @param description
* the optional error description
*/
- public AbstractJobResult(final ReturnCode returnCode, final String description) {
+ public AbstractJobResult(ReturnCode returnCode, String description) {
this.returnCode = returnCode;
this.description = description;
}
@@ -79,8 +74,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
* Construct a new abstract job result object. This constructor is required
* for the deserialization process.
*/
- public AbstractJobResult() {
- }
+ public AbstractJobResult() {}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 79da72f..028713d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.client;
import java.io.IOException;
@@ -45,29 +44,19 @@ import org.apache.flink.util.StringUtils;
/**
* The job client is able to submit, control, and abort jobs.
- * <p>
- * This class is thread-safe.
*/
public class JobClient {
- /**
- * The logging object used for debugging.
- */
+ /** The logging object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
- /**
- * The job management server stub.
- */
+ /** The job management server stub.*/
private final JobManagementProtocol jobSubmitClient;
- /**
- * The accumulator protocol stub to request accumulators from JobManager
- */
+ /** The accumulator protocol stub to request accumulators from JobManager */
private AccumulatorProtocol accumulatorProtocolProxy;
- /**
- * The job graph assigned with this job client.
- */
+ /** The job graph assigned with this job client. */
private final JobGraph jobGraph;
/**
@@ -121,7 +110,7 @@ public class JobClient {
/**
* Constructs a new job client object and instantiates a local
- * RPC proxy for the {@link JobSubmissionProtocol}.
+ * RPC proxy for the JobSubmissionProtocol
*
* @param jobGraph
* the job graph to run
@@ -134,7 +123,7 @@ public class JobClient {
/**
* Constructs a new job client object and instantiates a local
- * RPC proxy for the {@link JobSubmissionProtocol}.
+ * RPC proxy for the JobSubmissionProtocol
*
* @param jobGraph
* the job graph to run
@@ -160,7 +149,7 @@ public class JobClient {
/**
* Constructs a new job client object and instantiates a local
- * RPC proxy for the {@link JobSubmissionProtocol}.
+ * RPC proxy for the JobSubmissionProtocol
*
* @param jobGraph
* the job graph to run
@@ -335,7 +324,7 @@ public class JobClient {
if (event instanceof JobEvent) {
final JobEvent jobEvent = (JobEvent) event;
final JobStatus jobStatus = jobEvent.getCurrentJobStatus();
- if (jobStatus == JobStatus.SCHEDULED) {
+ if (jobStatus == JobStatus.RUNNING) {
startTimestamp = jobEvent.getTimestamp();
}
if (jobStatus == JobStatus.FINISHED) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
index c755dc6..53319d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
@@ -16,14 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.client;
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
/**
* A <code>JobSubmissionResult</code> is used to report the results
* of a job submission. It contains a return code and a description.
@@ -53,17 +47,4 @@ public class JobSubmissionResult extends AbstractJobResult {
public JobSubmissionResult() {
super();
}
-
-
- @Override
- public void read(final DataInputView in) throws IOException {
- super.read(in);
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- super.write(out);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
index f89e999..b4a38f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.deployment;
import java.io.IOException;
@@ -24,43 +23,30 @@ import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge2;
import org.apache.flink.runtime.io.network.channels.ChannelID;
/**
* A channel deployment descriptor contains all the information necessary to deploy either an input or an output channel
* as part of a task on a task manager.
- * <p>
- * This class is not thread-safe in general.
- *
*/
public final class ChannelDeploymentDescriptor implements IOReadableWritable {
- /**
- * The ID of the output channel.
- */
+ /** The ID of the output channel. */
private final ChannelID outputChannelID;
- /**
- * The ID of the input channel.
- */
+ /** The ID of the input channel. */
private final ChannelID inputChannelID;
/**
* Constructs a new channel deployment descriptor.
*
- * @param outputChannelID
- * the ID of the output channel
- * @param inputChannelID
- * the ID of the input channel
+ * @param outputChannelID The ID of the output channel
+ * @param inputChannelID The ID of the input channel
*/
- public ChannelDeploymentDescriptor(final ChannelID outputChannelID, final ChannelID inputChannelID) {
-
- if (outputChannelID == null) {
- throw new IllegalArgumentException("Argument outputChannelID must not be null");
- }
-
- if (inputChannelID == null) {
- throw new IllegalArgumentException("Argument inputChannelID must not be null");
+ public ChannelDeploymentDescriptor(ChannelID outputChannelID, ChannelID inputChannelID) {
+ if (outputChannelID == null || inputChannelID == null) {
+ throw new IllegalArgumentException("Channel IDs must not be null");
}
this.outputChannelID = outputChannelID;
@@ -71,23 +57,20 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
* Default constructor for serialization/deserialization.
*/
public ChannelDeploymentDescriptor() {
-
this.outputChannelID = new ChannelID();
this.inputChannelID = new ChannelID();
}
@Override
- public void write(final DataOutputView out) throws IOException {
-
+ public void write(DataOutputView out) throws IOException {
this.outputChannelID.write(out);
this.inputChannelID.write(out);
}
@Override
- public void read(final DataInputView in) throws IOException {
-
+ public void read(DataInputView in) throws IOException {
this.outputChannelID.read(in);
this.inputChannelID.read(in);
}
@@ -98,7 +81,6 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
* @return the output channel ID attached to this deployment descriptor
*/
public ChannelID getOutputChannelID() {
-
return this.outputChannelID;
}
@@ -108,7 +90,12 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
* @return the input channel ID attached to this deployment descriptor
*/
public ChannelID getInputChannelID() {
-
return this.inputChannelID;
}
+
+ // --------------------------------------------------------------------------------------------
+
+ public static ChannelDeploymentDescriptor fromExecutionEdge(ExecutionEdge2 edge) {
+ return new ChannelDeploymentDescriptor(edge.getOutputChannelId(), edge.getInputChannelId());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
index dfec497..e4a447f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
@@ -16,74 +16,36 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.deployment;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.util.EnumUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge2;
/**
- * A gate deployment descriptor contains all the information necessary to deploy either an input or an output gate as
- * part of a task on a task manager.
- * <p>
- * This class is not thread-safe in general.
- *
+ * A gate deployment descriptor contains the deployment descriptors for the channels associated with that gate.
*/
public final class GateDeploymentDescriptor implements IOReadableWritable {
- /**
- * The ID of the gate.
- */
- private final GateID gateID;
-
- /**
- * The channel type of the gate.
- */
- private ChannelType channelType;
-
- /**
- * The list of channel deployment descriptors attached to this gate.
- */
+ /** The list of channel deployment descriptors attached to this gate. */
private final List<ChannelDeploymentDescriptor> channels;
/**
* Constructs a new gate deployment descriptor
*
- * @param gateID
- * the ID of the gate
- * @param channelType
- * the channel type of the gate
- * @param compressionLevel
- * the compression level of the gate
* @param channels
* the list of channel deployment descriptors attached to this gate
*/
- public GateDeploymentDescriptor(final GateID gateID, final ChannelType channelType,
- List<ChannelDeploymentDescriptor> channels) {
-
- if (gateID == null) {
- throw new IllegalArgumentException("Argument gateID must no be null");
- }
-
- if (channelType == null) {
- throw new IllegalArgumentException("Argument channelType must no be null");
- }
-
+ public GateDeploymentDescriptor(List<ChannelDeploymentDescriptor> channels) {
if (channels == null) {
- throw new IllegalArgumentException("Argument channels must no be null");
+ throw new NullPointerException();
}
- this.gateID = gateID;
- this.channelType = channelType;
this.channels = channels;
}
@@ -91,71 +53,49 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
* Default constructor for serialization/deserialization.
*/
public GateDeploymentDescriptor() {
-
- this.gateID = new GateID();
- this.channelType = null;
this.channels = new ArrayList<ChannelDeploymentDescriptor>();
}
-
+
+ public List<ChannelDeploymentDescriptor> getChannels() {
+ return channels;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
@Override
public void write(final DataOutputView out) throws IOException {
-
- this.gateID.write(out);
- EnumUtils.writeEnum(out, channelType);
out.writeInt(this.channels.size());
- final Iterator<ChannelDeploymentDescriptor> it = this.channels.iterator();
- while (it.hasNext()) {
- it.next().write(out);
+ for (ChannelDeploymentDescriptor cdd : this.channels) {
+ cdd.write(out);
}
}
-
@Override
public void read(final DataInputView in) throws IOException {
-
- this.gateID.read(in);
- this.channelType = EnumUtils.readEnum(in, ChannelType.class);
final int nocdd = in.readInt();
for (int i = 0; i < nocdd; ++i) {
- final ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor();
+ ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor();
cdd.read(in);
this.channels.add(cdd);
}
}
-
- /**
- * Returns the ID of the gate.
- *
- * @return the ID of the gate
- */
- public GateID getGateID() {
-
- return this.gateID;
- }
-
- /**
- * Returns the channel type of the gate.
- *
- * @return the channel type of the gate
- */
- public ChannelType getChannelType() {
-
- return this.channelType;
- }
-
- /**
- * Returns the number of channel deployment descriptors attached to this gate descriptor.
- *
- * @return the number of channel deployment descriptors
- */
- public int getNumberOfChannelDescriptors() {
-
- return this.channels.size();
+
+ // --------------------------------------------------------------------------------------------
+
+ public static GateDeploymentDescriptor fromEdges(List<ExecutionEdge2> edges) {
+ List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(edges.size());
+ for (ExecutionEdge2 edge : edges) {
+ channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge));
+ }
+ return new GateDeploymentDescriptor(channels);
}
-
- public ChannelDeploymentDescriptor getChannelDescriptor(final int index) {
-
- return this.channels.get(index);
+
+ public static GateDeploymentDescriptor fromEdges(ExecutionEdge2[] edges) {
+ List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(edges.length);
+ for (ExecutionEdge2 edge : edges) {
+ channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge));
+ }
+ return new GateDeploymentDescriptor(channels);
}
}