You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/01/06 11:45:30 UTC
[3/4] incubator-flink git commit: [FLINK-1349] [runtime] Various
fixes concerning Akka - Remove obsolete code from old IPC net utils - Smaller
Writable/IOReadableWritable serialzation buffer start size (most messages are
rather small) - For message lo
[FLINK-1349] [runtime] Various fixes concerning Akka
- Remove obsolete code from old IPC net utils
- Smaller Writable/IOReadableWritable serialzation buffer start size (most messages are rather small)
- For message logging, make system calls (timestamps) only in debug mode
- Clean up warnings / code simplifications
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/972a7b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/972a7b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/972a7b01
Branch: refs/heads/master
Commit: 972a7b01980b4b06d4893df6aa6aebd4e26e5990
Parents: 24c4736
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Dec 28 10:55:21 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 6 11:07:02 2015 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/yarn/Utils.java | 2 +-
.../flink/configuration/ConfigConstants.java | 11 +-
.../java/org/apache/flink/core/fs/Path.java | 23 +-
flink-dist/pom.xml | 2 +-
flink-runtime/pom.xml | 1 -
.../runtime/accumulators/AccumulatorEvent.java | 2 +
.../org/apache/flink/runtime/blob/BlobKey.java | 88 ++--
.../deployment/ChannelDeploymentDescriptor.java | 2 +
.../deployment/GateDeploymentDescriptor.java | 2 +
.../deployment/TaskDeploymentDescriptor.java | 11 +
.../runtime/fs/hdfs/DistributedFileStatus.java | 10 +-
.../network/ConnectionInfoLookupResponse.java | 4 +-
.../runtime/io/network/RemoteReceiver.java | 2 +
.../apache/flink/runtime/jobgraph/JobGraph.java | 18 +-
.../runtime/jobmanager/scheduler/SubSlot.java | 3 +
.../runtime/jobmanager/web/WebInfoServer.java | 11 +-
.../org/apache/flink/runtime/net/NetUtils.java | 317 ++-----------
.../flink/runtime/net/SocketIOWithTimeout.java | 457 -------------------
.../flink/runtime/net/SocketInputStream.java | 179 --------
.../flink/runtime/net/SocketOutputStream.java | 222 ---------
.../apache/flink/runtime/ActorLogMessages.scala | 16 +-
.../IOReadableWritableSerializer.scala | 2 +-
.../akka/serialization/WritableSerializer.scala | 2 +-
.../flink/runtime/taskmanager/TaskManager.scala | 341 +++++++-------
.../ExecutionGraphDeploymentTest.java | 2 +-
.../ExecutionVertexCancelTest.java | 8 +-
.../runtime/operators/DataSinkTaskTest.java | 17 +-
.../runtime/operators/DataSourceTaskTest.java | 3 +-
.../apache/flink/test/util/TestBaseUtils.java | 17 +
flink-tests/pom.xml | 1 -
.../CustomCompensatableDanglingPageRank.java | 4 +-
...mpensatableDanglingPageRankWithCombiner.java | 4 +-
.../javaApiOperators/GroupReduceITCase.java | 64 +--
33 files changed, 372 insertions(+), 1476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 6b541bf..bd5659a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -121,7 +121,7 @@ public class Utils {
// chain-in a new classloader
URL fileUrl = null;
try {
- fileUrl = path.toURL();
+ fileUrl = path.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("Erroneous config file path", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2d97504..df96339 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -295,7 +295,7 @@ public final class ConfigConstants {
public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";
/**
- * Hearbeat interval of the transport failure detector
+ * Heartbeat interval of the transport failure detector
*/
public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval";
@@ -466,11 +466,6 @@ public final class ConfigConstants {
public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
/**
- * The default interval for TaskManager heart beats (5000 msecs).
- */
- public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 5000;
-
- /**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
*/
public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -559,8 +554,8 @@ public final class ConfigConstants {
/**
* The default directory to store temporary objects (e.g. during file uploads).
*/
- public static final String DEFAULT_WEB_TMP_DIR = System.getProperty("java.io.tmpdir") == null ? "/tmp" : System
- .getProperty("java.io.tmpdir");
+ public static final String DEFAULT_WEB_TMP_DIR =
+ System.getProperty("java.io.tmpdir") == null ? "/tmp" : System.getProperty("java.io.tmpdir");
/**
* The default directory for temporary plan dumps from the web frontend.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 950a38b..a104d86 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -16,16 +16,12 @@
* limitations under the License.
*/
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
+/* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
+ * additional information regarding copyright ownership. */
package org.apache.flink.core.fs;
-import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@@ -501,19 +497,4 @@ public class Path implements IOReadableWritable, Serializable {
}
}
-
- public static String constructTestPath(Class<?> forClass, String folder) {
- // we create test path that depends on class to prevent name clashes when two tests
- // create temp files with the same name
- String path = System.getProperty("java.io.tmpdir");
- if (!(path.endsWith("/") || path.endsWith("\\")) ) {
- path += System.getProperty("file.separator");
- }
- path += (forClass.getName() + "-" + folder);
- return path;
- }
-
- public static String constructTestURI(Class<?> forClass, String folder) {
- return new File(constructTestPath(forClass, folder)).toURI().toString();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index f73a000..e753a05 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -210,7 +210,7 @@ under the License.
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
+ <version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
<!-- yarn bin directory -->
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b87977e..5a9ca8a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -130,7 +130,6 @@ under the License.
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.10</artifactId>
- <version>3.2.0</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
index 761248b..3fed259 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
@@ -39,6 +39,8 @@ import org.apache.flink.util.InstantiationUtil;
*/
public class AccumulatorEvent implements Serializable {
+ private static final long serialVersionUID = 8965894516006882735L;
+
private JobID jobID;
private Map<String, Accumulator<?, ?>> accumulators;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index c12fd23..e3d237d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -32,19 +32,16 @@ import java.util.Arrays;
*/
public final class BlobKey implements Serializable, Comparable<BlobKey> {
- /**
- * Array of hex characters to facilitate fast toString() method.
- */
+ private static final long serialVersionUID = 3847117712521785209L;
+
+ /** Array of hex characters to facilitate fast toString() method. */
private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();
- /**
- * Size of the internal BLOB key in bytes.
- */
+ /** Size of the internal BLOB key in bytes. */
private static final int SIZE = 20;
- /**
- * The byte buffer storing the actual key data.
- */
+
+ /** The byte buffer storing the actual key data. */
private final byte[] key;
/**
@@ -60,8 +57,7 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
* @param key
* the actual key data
*/
- BlobKey(final byte[] key) {
-
+ BlobKey(byte[] key) {
if (key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}
@@ -70,8 +66,15 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
}
/**
- * {@inheritDoc}
+ * Adds the BLOB key to the given {@link MessageDigest}.
+ *
+ * @param md
+ * the message digest to add the BLOB key to
*/
+ public void addToMessageDigest(MessageDigest md) {
+ md.update(this.key);
+ }
+
@Override
public boolean equals(final Object obj) {
@@ -84,17 +87,11 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
return Arrays.equals(this.key, bk.key);
}
- /**
- * {@inheritDoc}
- */
@Override
public int hashCode() {
return Arrays.hashCode(this.key);
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
// from http://stackoverflow.com/questions/9655181/convert-from-byte-array-to-hex-string-in-java
@@ -108,6 +105,26 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
return new String(hexChars);
}
+ @Override
+ public int compareTo(BlobKey o) {
+
+ final byte[] aarr = this.key;
+ final byte[] barr = o.key;
+ final int len = Math.min(aarr.length, barr.length);
+
+ for (int i = 0; i < len; ++i) {
+ final int a = (aarr[i] & 0xff);
+ final int b = (barr[i] & 0xff);
+ if (a != b) {
+ return a - b;
+ }
+ }
+
+ return aarr.length - barr.length;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
/**
* Auxiliary method to read a BLOB key from an input stream.
*
@@ -117,7 +134,7 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
* @throws IOException
* throw if an I/O error occurs while reading from the input stream
*/
- static BlobKey readFromInputStream(final InputStream inputStream) throws IOException {
+ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
final byte[] key = new byte[BlobKey.SIZE];
@@ -142,39 +159,6 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
* thrown if an I/O error occurs while writing the BLOB key
*/
void writeToOutputStream(final OutputStream outputStream) throws IOException {
-
outputStream.write(this.key);
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int compareTo(final BlobKey o) {
-
- final byte[] aarr = this.key;
- final byte[] barr = o.key;
- final int len = Math.min(aarr.length, barr.length);
-
- for (int i = 0; i < len; ++i) {
- final int a = (aarr[i] & 0xff);
- final int b = (barr[i] & 0xff);
- if (a != b) {
- return a - b;
- }
- }
-
- return aarr.length - barr.length;
- }
-
- /**
- * Adds the BLOB key to the given {@link MessageDigest}.
- *
- * @param md
- * the message digest to add the BLOB key to
- */
- public void addToMessageDigest(final MessageDigest md) {
-
- md.update(this.key);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
index 2c90091..6fff4a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.io.network.channels.ChannelID;
*/
public final class ChannelDeploymentDescriptor implements Serializable {
+ private static final long serialVersionUID = -4079084629425460213L;
+
/** The ID of the output channel. */
private final ChannelID outputChannelID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
index 1f73546..c676023 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
*/
public final class GateDeploymentDescriptor implements Serializable {
+ private static final long serialVersionUID = -8433936680266802364L;
+
/** The list of channel deployment descriptors attached to this gate. */
private final List<ChannelDeploymentDescriptor> channels;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 20157d9..75a3c9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
*/
public final class TaskDeploymentDescriptor implements Serializable {
+ private static final long serialVersionUID = -3233562176034358530L;
+
/** The ID of the job the tasks belongs to. */
private final JobID jobID;
@@ -192,6 +194,15 @@ public final class TaskDeploymentDescriptor implements Serializable {
public int getCurrentNumberOfSubtasks() {
return this.currentNumberOfSubtasks;
}
+
+ /**
+ * Gets the number of the slot into which the task is to be deployed.
+ *
+ * @return The number of the target slot.
+ */
+ public int getTargetSlotNumber() {
+ return targetSlotNumber;
+ }
/**
* Returns the configuration of the job the task belongs to.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
index 6b819dc..d8c9873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.FileStatus;
@@ -25,7 +24,6 @@ import org.apache.flink.core.fs.Path;
/**
* Concrete implementation of the {@link FileStatus} interface for the
* Hadoop Distribution File System.
- *
*/
public final class DistributedFileStatus implements FileStatus {
@@ -43,13 +41,11 @@ public final class DistributedFileStatus implements FileStatus {
@Override
public long getLen() {
-
return fileStatus.getLen();
}
@Override
public long getBlockSize() {
-
long blocksize = fileStatus.getBlockSize();
if (blocksize > fileStatus.getLen()) {
return fileStatus.getLen();
@@ -60,19 +56,16 @@ public final class DistributedFileStatus implements FileStatus {
@Override
public long getAccessTime() {
-
return fileStatus.getAccessTime();
}
@Override
public long getModificationTime() {
-
return fileStatus.getModificationTime();
}
@Override
public short getReplication() {
-
return fileStatus.getReplication();
}
@@ -82,13 +75,12 @@ public final class DistributedFileStatus implements FileStatus {
@Override
public Path getPath() {
-
return new Path(fileStatus.getPath().toString());
}
+ @SuppressWarnings("deprecation")
@Override
public boolean isDir() {
-
return fileStatus.isDir();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
index 547ce70..51a0f94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.network;
import java.io.Serializable;
@@ -25,6 +24,9 @@ import org.apache.flink.runtime.io.network.channels.ChannelID;
public class ConnectionInfoLookupResponse implements Serializable {
+ private static final long serialVersionUID = 3961171754642077522L;
+
+
private enum ReturnCode {
NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
};
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 78dcbae..436d07d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -32,6 +32,8 @@ import org.apache.flink.core.memory.DataOutputView;
*/
public final class RemoteReceiver implements IOReadableWritable, Serializable {
+ private static final long serialVersionUID = 4304924747853162443L;
+
/**
* The address of the connection to the remote TaskManager.
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 3497824..a288357 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -43,10 +43,12 @@ import org.apache.flink.runtime.blob.BlobKey;
*/
public class JobGraph implements Serializable {
+ private static final long serialVersionUID = 1L;
+
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
-
+
/** List of task vertices included in this job graph. */
private final Map<JobVertexID, AbstractJobVertex> taskVertices = new LinkedHashMap<JobVertexID, AbstractJobVertex>();
@@ -54,7 +56,7 @@ public class JobGraph implements Serializable {
private final Configuration jobConfiguration = new Configuration();
/** Set of JAR files required to run this job. */
- private final transient List<Path> userJars = new ArrayList<Path>();
+ private final List<Path> userJars = new ArrayList<Path>();
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
@@ -357,7 +359,6 @@ public class JobGraph implements Serializable {
* @return set of BLOB keys referring to the JAR files required to run this job
*/
public List<BlobKey> getUserJarBlobKeys() {
-
return this.userJarBlobKeys;
}
@@ -369,15 +370,13 @@ public class JobGraph implements Serializable {
* @throws IOException
* thrown if an I/O error occurs during the upload
*/
- public void uploadRequiredJarFiles(final InetSocketAddress serverAddress) throws IOException {
-
+ public void uploadRequiredJarFiles(InetSocketAddress serverAddress) throws IOException {
if (this.userJars.isEmpty()) {
return;
}
BlobClient bc = null;
try {
-
bc = new BlobClient(serverAddress);
for (final Path jar : this.userJars) {
@@ -388,14 +387,15 @@ public class JobGraph implements Serializable {
is = fs.open(jar);
final BlobKey key = bc.put(is);
this.userJarBlobKeys.add(key);
- } finally {
+ }
+ finally {
if (is != null) {
is.close();
}
}
}
-
- } finally {
+ }
+ finally {
if (bc != null) {
bc.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
index 0b19a5f..92d457b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
@@ -23,6 +23,9 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
public class SubSlot extends AllocatedSlot {
+ private static final long serialVersionUID = 1361615219044538497L;
+
+
private final SharedSlot sharedSlot;
private final AbstractID groupId;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 7e67603..24dbaf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -85,15 +85,16 @@ public class WebInfoServer {
*/
public WebInfoServer(Configuration config, ActorRef jobmanager,
ActorRef archive, FiniteDuration timeout) throws IOException {
- this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
-
- this.timeout = timeout;
-
+
// if no explicit configuration is given, use the global configuration
if (config == null) {
config = GlobalConfiguration.getConfiguration();
}
+
+ this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+
+ this.timeout = timeout;
// get base path of Flink installation
final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 2795158..ec2633c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -16,18 +16,9 @@
* limitations under the License.
*/
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
package org.apache.flink.runtime.net;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -35,286 +26,15 @@ import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import javax.net.SocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetUtils {
+
private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
- private static Map<String, String> hostToResolved = new HashMap<String, String>();
-
- public static SocketFactory getSocketFactory() {
-
- return getDefaultSocketFactory();
- }
-
- public static SocketFactory getDefaultSocketFactory() {
-
- return SocketFactory.getDefault();
- }
-
- /**
- * Util method to build socket addr from either:
- * <host>:<post>
- * <fs>://<host>:<port>/<path>
- */
- public static InetSocketAddress createSocketAddr(String target) {
- return createSocketAddr(target, -1);
- }
-
- /**
- * Util method to build socket addr from either:
- * <host>
- * <host>:<post>
- * <fs>://<host>:<port>/<path>
- */
- public static InetSocketAddress createSocketAddr(String target, int defaultPort) {
- int colonIndex = target.indexOf(':');
- if (colonIndex < 0 && defaultPort == -1) {
- throw new RuntimeException("Not a host:port pair: " + target);
- }
- String hostname = "";
- int port = -1;
- if (!target.contains("/")) {
- if (colonIndex == -1) {
- hostname = target;
- } else {
- // must be the old style <host>:<port>
- hostname = target.substring(0, colonIndex);
- port = Integer.parseInt(target.substring(colonIndex + 1));
- }
- } else {
- // a new uri
- try {
- URI addr = new URI(target);
- hostname = addr.getHost();
- port = addr.getPort();
- } catch (URISyntaxException use) {
- LOG.error("Invalid URI syntax.", use);
- }
- }
-
- if (port == -1) {
- port = defaultPort;
- }
-
- if (getStaticResolution(hostname) != null) {
- hostname = getStaticResolution(hostname);
- }
- return new InetSocketAddress(hostname, port);
- }
-
- /**
- * Adds a static resolution for host. This can be used for setting up
- * hostnames with names that are fake to point to a well known host. For e.g.
- * in some testcases we require to have daemons with different hostnames
- * running on the same machine. In order to create connections to these
- * daemons, one can set up mappings from those hostnames to "localhost".
- * {@link NetUtils#getStaticResolution(String)} can be used to query for
- * the actual hostname.
- *
- * @param host
- * @param resolvedName
- */
- public static void addStaticResolution(String host, String resolvedName) {
- synchronized (hostToResolved) {
- hostToResolved.put(host, resolvedName);
- }
- }
-
- /**
- * Retrieves the resolved name for the passed host. The resolved name must
- * have been set earlier using {@link NetUtils#addStaticResolution(String, String)}
- *
- * @param host
- * @return the resolution
- */
- public static String getStaticResolution(String host) {
- synchronized (hostToResolved) {
- return hostToResolved.get(host);
- }
- }
-
- /**
- * This is used to get all the resolutions that were added using
- * {@link NetUtils#addStaticResolution(String, String)}. The return
- * value is a List each element of which contains an array of String
- * of the form String[0]=hostname, String[1]=resolved-hostname
- *
- * @return the list of resolutions
- */
- public static List<String[]> getAllStaticResolutions() {
- synchronized (hostToResolved) {
- Set<Entry<String, String>> entries = hostToResolved.entrySet();
- if (entries.size() == 0) {
- return null;
- }
- List<String[]> l = new ArrayList<String[]>(entries.size());
- for (Entry<String, String> e : entries) {
- l.add(new String[] { e.getKey(), e.getValue() });
- }
- return l;
- }
- }
-
- /**
- * Same as getInputStream(socket, socket.getSoTimeout()).<br>
- * <br>
- * From documentation for {@link #getInputStream(Socket, long)}:<br>
- * Returns InputStream for the socket. If the socket has an associated
- * SocketChannel then it returns a {@link SocketInputStream} with the given timeout. If the socket does not
- * have a channel, {@link Socket#getInputStream()} is returned. In the later
- * case, the timeout argument is ignored and the timeout set with {@link Socket#setSoTimeout(int)} applies for
- * reads.<br>
- * <br>
- * Any socket created using socket factories returned by {@link #NetUtils},
- * must use this interface instead of {@link Socket#getInputStream()}.
- *
- * @see #getInputStream(Socket, long)
- * @param socket
- * @return InputStream for reading from the socket.
- * @throws IOException
- */
- public static InputStream getInputStream(Socket socket) throws IOException {
- return getInputStream(socket, socket.getSoTimeout());
- }
-
- /**
- * Returns InputStream for the socket. If the socket has an associated
- * SocketChannel then it returns a {@link SocketInputStream} with the given timeout. If the socket does not
- * have a channel, {@link Socket#getInputStream()} is returned. In the later
- * case, the timeout argument is ignored and the timeout set with {@link Socket#setSoTimeout(int)} applies for
- * reads.<br>
- * <br>
- * Any socket created using socket factories returned by {@link #NetUtils},
- * must use this interface instead of {@link Socket#getInputStream()}.
- *
- * @see Socket#getChannel()
- * @param socket
- * @param timeout
- * timeout in milliseconds. This may not always apply. zero
- * for waiting as long as necessary.
- * @return InputStream for reading from the socket.
- * @throws IOException
- */
- public static InputStream getInputStream(Socket socket, long timeout) throws IOException {
- return (socket.getChannel() == null) ? socket.getInputStream() : new SocketInputStream(socket, timeout);
- }
-
- /**
- * Same as getOutputStream(socket, 0). Timeout of zero implies write will
- * wait until data is available.<br>
- * <br>
- * From documentation for {@link #getOutputStream(Socket, long)} : <br>
- * Returns OutputStream for the socket. If the socket has an associated
- * SocketChannel then it returns a {@link SocketOutputStream} with the given timeout. If the socket does not
- * have a channel, {@link Socket#getOutputStream()} is returned. In the later
- * case, the timeout argument is ignored and the write will wait until
- * data is available.<br>
- * <br>
- * Any socket created using socket factories returned by {@link #NetUtils},
- * must use this interface instead of {@link Socket#getOutputStream()}.
- *
- * @see #getOutputStream(Socket, long)
- * @param socket
- * @return OutputStream for writing to the socket.
- * @throws IOException
- */
- public static OutputStream getOutputStream(Socket socket) throws IOException {
- return getOutputStream(socket, 0);
- }
-
- /**
- * Returns OutputStream for the socket. If the socket has an associated
- * SocketChannel then it returns a {@link SocketOutputStream} with the given timeout. If the socket does not
- * have a channel, {@link Socket#getOutputStream()} is returned. In the later
- * case, the timeout argument is ignored and the write will wait until
- * data is available.<br>
- * <br>
- * Any socket created using socket factories returned by {@link #NetUtils},
- * must use this interface instead of {@link Socket#getOutputStream()}.
- *
- * @see Socket#getChannel()
- * @param socket
- * @param timeout
- * timeout in milliseconds. This may not always apply. zero
- * for waiting as long as necessary.
- * @return OutputStream for writing to the socket.
- * @throws IOException
- */
- public static OutputStream getOutputStream(Socket socket, long timeout) throws IOException {
- return (socket.getChannel() == null) ? socket.getOutputStream() : new SocketOutputStream(socket, timeout);
- }
-
- public static void connect(Socket socket, SocketAddress endpoint, int timeout) throws IOException {
- if (socket == null || endpoint == null || timeout < 0) {
- throw new IllegalArgumentException("Illegal argument for connect()");
- }
-
- SocketChannel ch = socket.getChannel();
-
- if (ch == null) {
- // let the default implementation handle it.
- socket.connect(endpoint, timeout);
- } else {
- SocketIOWithTimeout.connect(ch, endpoint, timeout);
- }
- }
-
- /**
- * Given a string representation of a host, return its ip address
- * in textual presentation.
- *
- * @param name
- * a string representation of a host:
- * either a textual representation its IP address or its host name
- * @return its IP address in the string format
- */
- public static String normalizeHostName(String name) {
- if (Character.digit(name.charAt(0), 16) != -1) { // it is an IP
- return name;
- } else {
- try {
- InetAddress ipAddress = InetAddress.getByName(name);
- return ipAddress.getHostAddress();
- } catch (UnknownHostException e) {
- return name;
- }
- }
- }
-
- /**
- * Given a collection of string representation of hosts, return a list of
- * corresponding IP addresses in the textual representation.
- *
- * @param names
- * a collection of string representations of hosts
- * @return a list of corresponding IP addresses in the string format
- * @see #normalizeHostName(String)
- */
- public static List<String> normalizeHostNames(Collection<String> names) {
- List<String> hostNames = new ArrayList<String>(names.size());
- for (String name : names) {
- hostNames.add(normalizeHostName(name));
- }
- return hostNames;
- }
-
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
@@ -344,11 +64,14 @@ public class NetUtils {
while (true) {
Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+
while (e.hasMoreElements()) {
NetworkInterface n = e.nextElement();
Enumeration<InetAddress> ee = n.getInetAddresses();
+
while (ee.hasMoreElements()) {
InetAddress i = ee.nextElement();
+
switch (strategy) {
case ADDRESS:
if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
@@ -358,6 +81,7 @@ public class NetUtils {
}
}
break;
+
case FAST_CONNECT:
case SLOW_CONNECT:
boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
@@ -366,17 +90,22 @@ public class NetUtils {
return i;
}
break;
+
case HEURISTIC:
- LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
- "isLinkLocalAddress:" + i.isLinkLocalAddress() +" " +
- "isLoopbackAddress:" + i.isLoopbackAddress() + ".");
- if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
+ " isLinkLocalAddress:" + i.isLinkLocalAddress() +
+ " isLoopbackAddress:" + i.isLoopbackAddress() + ".");
+ }
+
+ if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
"loopback address. Using instead " + i.getHostAddress() + " on network " +
"interface " + n.getName() + ".");
return i;
}
break;
+
default:
throw new RuntimeException("Unkown address detection strategy: " + strategy);
}
@@ -391,11 +120,11 @@ public class NetUtils {
strategy = AddressDetectionState.SLOW_CONNECT;
break;
case SLOW_CONNECT:
- if(!InetAddress.getLocalHost().isLoopbackAddress()){
+ if (!InetAddress.getLocalHost().isLoopbackAddress()) {
LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " +
"IP address.");
return InetAddress.getLocalHost();
- }else {
+ } else {
strategy = AddressDetectionState.HEURISTIC;
break;
}
@@ -437,7 +166,8 @@ public class NetUtils {
LOG.debug("Failed with exception", ex);
}
connectable = false;
- } finally {
+ }
+ finally {
if (socket != null) {
socket.close();
}
@@ -454,13 +184,18 @@ public class NetUtils {
if (port != 0) {
return port;
}
- } catch (IOException e) {
- LOG.debug("Unable to allocate port " + e.getMessage(), e);
- } finally {
+ }
+ catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to allocate port " + e.getMessage(), e);
+ }
+ }
+ finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (Throwable t) {
+ // ignored
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
deleted file mode 100644
index daecfa5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This supports input and output streams for a socket channels.
- * These streams can have a timeout.
- */
-abstract class SocketIOWithTimeout {
- // This is intentionally package private.
-
- static final Logger LOG = LoggerFactory.getLogger(SocketIOWithTimeout.class);
-
- private SelectableChannel channel;
-
- private long timeout;
-
- private boolean closed = false;
-
- private static SelectorPool selector = new SelectorPool();
-
- /*
- * A timeout value of 0 implies wait for ever.
- * We should have a value of timeout that implies zero wait.. i.e.
- * read or write returns immediately.
- * This will set channel to non-blocking.
- */
- SocketIOWithTimeout(SelectableChannel channel, long timeout)
- throws IOException {
- checkChannelValidity(channel);
-
- this.channel = channel;
- this.timeout = timeout;
- // Set non-blocking
- channel.configureBlocking(false);
- }
-
- void close() {
- closed = true;
- }
-
- boolean isOpen() {
- return !closed && channel.isOpen();
- }
-
- SelectableChannel getChannel() {
- return channel;
- }
-
- /**
- * Utility function to check if channel is ok.
- * Mainly to throw IOException instead of runtime exception
- * in case of mismatch. This mismatch can occur for many runtime
- * reasons.
- */
- static void checkChannelValidity(Object channel) throws IOException {
- if (channel == null) {
- /*
- * Most common reason is that original socket does not have a channel.
- * So making this an IOException rather than a RuntimeException.
- */
- throw new IOException("Channel is null. Check " + "how the channel or socket is created.");
- }
-
- if (!(channel instanceof SelectableChannel)) {
- throw new IOException("Channel should be a SelectableChannel");
- }
- }
-
- /**
- * Performs actual IO operations. This is not expected to block.
- *
- * @param buf
- * @return number of bytes (or some equivalent). 0 implies underlying
- * channel is drained completely. We will wait if more IO is
- * required.
- * @throws IOException
- */
- abstract int performIO(ByteBuffer buf) throws IOException;
-
- /**
- * Performs one IO and returns number of bytes read or written.
- * It waits up to the specified timeout. If the channel is
- * not read before the timeout, SocketTimeoutException is thrown.
- *
- * @param buf
- * buffer for IO
- * @param ops
- * Selection Ops used for waiting. Suggested values:
- * SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
- * writing.
- * @return number of bytes read or written. negative implies end of stream.
- * @throws IOException
- */
- int doIO(ByteBuffer buf, int ops) throws IOException {
-
- /*
- * For now only one thread is allowed. If user want to read or write
- * from multiple threads, multiple streams could be created. In that
- * case multiple threads work as well as underlying channel supports it.
- */
- if (!buf.hasRemaining()) {
- throw new IllegalArgumentException("Buffer has no data left.");
- // or should we just return 0?
- }
-
- while (buf.hasRemaining()) {
- if (closed) {
- return -1;
- }
-
- try {
- int n = performIO(buf);
- if (n != 0) {
- // successful io or an error.
- return n;
- }
- } catch (IOException e) {
- if (!channel.isOpen()) {
- closed = true;
- }
- throw e;
- }
-
- // now wait for socket to be ready.
- int count = 0;
- try {
- count = selector.select(channel, ops, timeout);
- } catch (IOException e) { // unexpected IOException.
- closed = true;
- throw e;
- }
-
- if (count == 0) {
- throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, ops));
- }
- // otherwise the socket should be ready for io.
- }
-
- return 0; // does not reach here.
- }
-
- /**
- * The contract is similar to {@link SocketChannel#connect(SocketAddress)} with a timeout.
- *
- * @see SocketChannel#connect(SocketAddress)
- * @param channel
- * - this should be a {@link SelectableChannel}
- * @param endpoint
- * @throws IOException
- */
- static void connect(SocketChannel channel, SocketAddress endpoint, int timeout) throws IOException {
-
- boolean blockingOn = channel.isBlocking();
- if (blockingOn) {
- channel.configureBlocking(false);
- }
-
- try {
- if (channel.connect(endpoint)) {
- return;
- }
-
- long timeoutLeft = timeout;
- long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout) : 0;
-
- while (true) {
- // we might have to call finishConnect() more than once
- // for some channels (with user level protocols)
-
- int ret = selector.select((SelectableChannel) channel, SelectionKey.OP_CONNECT, timeoutLeft);
-
- if (ret > 0 && channel.finishConnect()) {
- return;
- }
-
- if (ret == 0 || (timeout > 0 && (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
- throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, SelectionKey.OP_CONNECT));
- }
- }
- } catch (IOException e) {
- // javadoc for SocketChannel.connect() says channel should be closed.
- try {
- channel.close();
- } catch (IOException ignored) {
- }
- throw e;
- } finally {
- if (blockingOn && channel.isOpen()) {
- channel.configureBlocking(true);
- }
- }
- }
-
- /**
- * This is similar to {@link #doIO(ByteBuffer, int)} except that it
- * does not perform any I/O. It just waits for the channel to be ready
- * for I/O as specified in ops.
- *
- * @param ops
- * Selection Ops used for waiting
- * @throws SocketTimeoutException
- * if select on the channel times out.
- * @throws IOException
- * if any other I/O error occurs.
- */
- void waitForIO(int ops) throws IOException {
-
- if (selector.select(channel, ops, timeout) == 0) {
- throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, ops));
- }
- }
-
- private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops) {
-
- String waitingFor;
- switch (ops) {
-
- case SelectionKey.OP_READ:
- waitingFor = "read";
- break;
-
- case SelectionKey.OP_WRITE:
- waitingFor = "write";
- break;
-
- case SelectionKey.OP_CONNECT:
- waitingFor = "connect";
- break;
-
- default:
- waitingFor = "" + ops;
- }
-
- return timeout + " millis timeout while " + "waiting for channel to be ready for " + waitingFor + ". ch : "
- + channel;
- }
-
- /**
- * This maintains a pool of selectors. These selectors are closed
- * once they are idle (unused) for a few seconds.
- */
- private static class SelectorPool {
-
- private static class SelectorInfo {
- Selector selector;
-
- long lastActivityTime;
-
- LinkedList<SelectorInfo> queue;
-
- void close() {
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException e) {
- LOG.warn("Unexpected exception while closing selector : " + e.toString());
- }
- }
- }
- }
-
- private static class ProviderInfo {
- SelectorProvider provider;
-
- LinkedList<SelectorInfo> queue; // lifo
-
- ProviderInfo next;
- }
-
- private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
-
- private ProviderInfo providerList = null;
-
- /**
- * Waits on the channel with the given timeout using one of the
- * cached selectors. It also removes any cached selectors that are
- * idle for a few seconds.
- *
- * @param channel
- * @param ops
- * @param timeout
- * @return
- * @throws IOException
- */
- int select(SelectableChannel channel, int ops, long timeout) throws IOException {
-
- SelectorInfo info = get(channel);
-
- SelectionKey key = null;
- int ret = 0;
-
- try {
- while (true) {
- long start = (timeout == 0) ? 0 : System.currentTimeMillis();
-
- key = channel.register(info.selector, ops);
- ret = info.selector.select(timeout);
-
- if (ret != 0) {
- return ret;
- }
-
- /*
- * Sometimes select() returns 0 much before timeout for
- * unknown reasons. So select again if required.
- */
- if (timeout > 0) {
- timeout -= System.currentTimeMillis() - start;
- if (timeout <= 0) {
- return 0;
- }
- }
-
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedIOException("Interruped while waiting for " + "IO on channel " + channel
- + ". " + timeout + " millis timeout left.");
- }
- }
- } finally {
- if (key != null) {
- key.cancel();
- }
-
- // clear the canceled key.
- try {
- info.selector.selectNow();
- } catch (IOException e) {
- LOG.info("Unexpected Exception while clearing selector : " + e.toString());
- // don't put the selector back.
- info.close();
- return ret;
- }
-
- release(info);
- }
- }
-
- /**
- * Takes one selector from end of LRU list of free selectors.
- * If there are no selectors awailable, it creates a new selector.
- * Also invokes trimIdleSelectors().
- *
- * @param channel
- * @return
- * @throws IOException
- */
- private synchronized SelectorInfo get(SelectableChannel channel) throws IOException {
- SelectorInfo selInfo = null;
-
- SelectorProvider provider = channel.provider();
-
- // pick the list : rarely there is more than one provider in use.
- ProviderInfo pList = providerList;
- while (pList != null && pList.provider != provider) {
- pList = pList.next;
- }
- if (pList == null) {
- // LOG.info("Creating new ProviderInfo : " + provider.toString());
- pList = new ProviderInfo();
- pList.provider = provider;
- pList.queue = new LinkedList<SelectorInfo>();
- pList.next = providerList;
- providerList = pList;
- }
-
- LinkedList<SelectorInfo> queue = pList.queue;
-
- if (queue.isEmpty()) {
- Selector selector = provider.openSelector();
- selInfo = new SelectorInfo();
- selInfo.selector = selector;
- selInfo.queue = queue;
- } else {
- selInfo = queue.removeLast();
- }
-
- trimIdleSelectors(System.currentTimeMillis());
- return selInfo;
- }
-
- /**
- * puts selector back at the end of LRU list of free selectos.
- * Also invokes trimIdleSelectors().
- *
- * @param info
- */
- private synchronized void release(SelectorInfo info) {
- long now = System.currentTimeMillis();
- trimIdleSelectors(now);
- info.lastActivityTime = now;
- info.queue.addLast(info);
- }
-
- /**
- * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
- * traverse the whole list, just over the one that have crossed
- * the timeout.
- */
- private void trimIdleSelectors(long now) {
- long cutoff = now - IDLE_TIMEOUT;
-
- for (ProviderInfo pList = providerList; pList != null; pList = pList.next) {
- if (pList.queue.isEmpty()) {
- continue;
- }
- for (Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
- SelectorInfo info = it.next();
- if (info.lastActivityTime > cutoff) {
- break;
- }
- it.remove();
- info.close();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java
deleted file mode 100644
index eb22ad0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-
-/**
- * This implements an input stream that can have a timeout while reading.
- * This sets non-blocking flag on the socket channel.
- * So after create this object, read() on {@link Socket#getInputStream()} and write() on
- * {@link Socket#getOutputStream()} for the associated socket will throw
- * IllegalBlockingModeException.
- * Please use {@link SocketOutputStream} for writing.
- */
-public class SocketInputStream extends InputStream implements ReadableByteChannel {
-
- private Reader reader;
-
- private static class Reader extends SocketIOWithTimeout {
- ReadableByteChannel channel;
-
- Reader(ReadableByteChannel channel, long timeout)
- throws IOException {
- super((SelectableChannel) channel, timeout);
- this.channel = channel;
- }
-
- int performIO(ByteBuffer buf) throws IOException {
- return channel.read(buf);
- }
- }
-
- /**
- * Create a new input stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @param channel
- * Channel for reading, should also be a {@link SelectableChannel}.
- * The channel will be configured to be non-blocking.
- * @param timeout
- * timeout in milliseconds. must not be negative.
- * @throws IOException
- */
- public SocketInputStream(ReadableByteChannel channel, long timeout)
- throws IOException {
- SocketIOWithTimeout.checkChannelValidity(channel);
- reader = new Reader(channel, timeout);
- }
-
- /**
- * Same as SocketInputStream(socket.getChannel(), timeout): <br>
- * <br>
- * Create a new input stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
- * @param socket
- * should have a channel associated with it.
- * @param timeout
- * timeout timeout in milliseconds. must not be negative.
- * @throws IOException
- */
- public SocketInputStream(Socket socket, long timeout)
- throws IOException {
- this(socket.getChannel(), timeout);
- }
-
- /**
- * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
- * :<br>
- * <br>
- * Create a new input stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
- * @param socket
- * should have a channel associated with it.
- * @throws IOException
- */
- public SocketInputStream(Socket socket)
- throws IOException {
- this(socket.getChannel(), socket.getSoTimeout());
- }
-
- @Override
- public int read() throws IOException {
- /*
- * Allocation can be removed if required.
- * probably no need to optimize or encourage single byte read.
- */
- byte[] buf = new byte[1];
- int ret = read(buf, 0, 1);
- if (ret > 0) {
- return (byte) buf[0];
- }
- if (ret != -1) {
- // unexpected
- throw new IOException("Could not read from stream");
- }
- return ret;
- }
-
- public int read(byte[] b, int off, int len) throws IOException {
- return read(ByteBuffer.wrap(b, off, len));
- }
-
- public synchronized void close() throws IOException {
- /*
- * close the channel since Socket.getInputStream().close()
- * closes the socket.
- */
- reader.channel.close();
- reader.close();
- }
-
- /**
- * Returns underlying channel used by inputstream.
- * This is useful in certain cases like channel for
- * {@link java.nio.channels.FileChannel#transferFrom(ReadableByteChannel, long, long)}.
- */
- public ReadableByteChannel getChannel() {
- return reader.channel;
- }
-
- // ReadableByteChannel interface
-
- public boolean isOpen() {
- return reader.isOpen();
- }
-
- public int read(ByteBuffer dst) throws IOException {
- return reader.doIO(dst, SelectionKey.OP_READ);
- }
-
- /**
- * waits for the underlying channel to be ready for reading.
- * The timeout specified for this stream applies to this wait.
- *
- * @throws SocketTimeoutException
- * if select on the channel times out.
- * @throws IOException
- * if any other I/O error occurs.
- */
- public void waitForReadable() throws IOException {
- reader.waitForIO(SelectionKey.OP_READ);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java
deleted file mode 100644
index 2df4bbe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * This implements an output stream that can have a timeout while writing.
- * This sets non-blocking flag on the socket channel.
- * So after creating this object , read() on {@link Socket#getInputStream()} and write() on
- * {@link Socket#getOutputStream()} on the associated socket will throw
- * llegalBlockingModeException.
- * Please use {@link SocketInputStream} for reading.
- */
-public class SocketOutputStream extends OutputStream implements WritableByteChannel {
-
- private Writer writer;
-
- private static class Writer extends SocketIOWithTimeout {
- WritableByteChannel channel;
-
- Writer(WritableByteChannel channel, long timeout)
- throws IOException {
- super((SelectableChannel) channel, timeout);
- this.channel = channel;
- }
-
- int performIO(ByteBuffer buf) throws IOException {
- return channel.write(buf);
- }
- }
-
- /**
- * Create a new ouput stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @param channel
- * Channel for writing, should also be a {@link SelectableChannel}.
- * The channel will be configured to be non-blocking.
- * @param timeout
- * timeout in milliseconds. must not be negative.
- * @throws IOException
- */
- public SocketOutputStream(WritableByteChannel channel, long timeout)
- throws IOException {
- SocketIOWithTimeout.checkChannelValidity(channel);
- writer = new Writer(channel, timeout);
- }
-
- /**
- * Same as SocketOutputStream(socket.getChannel(), timeout):<br>
- * <br>
- * Create a new ouput stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
- * @param socket
- * should have a channel associated with it.
- * @param timeout
- * timeout timeout in milliseconds. must not be negative.
- * @throws IOException
- */
- public SocketOutputStream(Socket socket, long timeout)
- throws IOException {
- this(socket.getChannel(), timeout);
- }
-
- public void write(int b) throws IOException {
- /*
- * If we need to, we can optimize this allocation.
- * probably no need to optimize or encourage single byte writes.
- */
- final byte[] buf = new byte[1];
- buf[0] = (byte) b;
- write(buf, 0, 1);
- }
-
- public void write(byte[] b, int off, int len) throws IOException {
- final ByteBuffer buf = ByteBuffer.wrap(b, off, len);
- while (buf.hasRemaining()) {
- try {
- if (write(buf) < 0) {
- throw new IOException("The stream is closed");
- }
- } catch (IOException e) {
- /*
- * Unlike read, write can not inform user of partial writes.
- * So will close this if there was a partial write.
- */
- if (buf.capacity() > buf.remaining()) {
- writer.close();
- }
- throw e;
- }
- }
- }
-
- public synchronized void close() throws IOException {
- /*
- * close the channel since Socket.getOuputStream().close()
- * closes the socket.
- */
- writer.channel.close();
- writer.close();
- }
-
- /**
- * Returns underlying channel used by this stream.
- * This is useful in certain cases like channel for {@link FileChannel#transferTo(long, long, WritableByteChannel)}
- */
- public WritableByteChannel getChannel() {
- return writer.channel;
- }
-
- // WritableByteChannle interface
-
- public boolean isOpen() {
- return writer.isOpen();
- }
-
- public int write(ByteBuffer src) throws IOException {
- return writer.doIO(src, SelectionKey.OP_WRITE);
- }
-
- /**
- * waits for the underlying channel to be ready for writing.
- * The timeout specified for this stream applies to this wait.
- *
- * @throws SocketTimeoutException
- * if select on the channel times out.
- * @throws IOException
- * if any other I/O error occurs.
- */
- public void waitForWritable() throws IOException {
- writer.waitForIO(SelectionKey.OP_WRITE);
- }
-
- /**
- * Transfers data from FileChannel using {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
- * Similar to readFully(), this waits till requested amount of
- * data is transfered.
- *
- * @param fileCh
- * FileChannel to transfer data from.
- * @param position
- * position within the channel where the transfer begins
- * @param count
- * number of bytes to transfer.
- * @throws EOFException
- * If end of input file is reached before requested number of
- * bytes are transfered.
- * @throws SocketTimeoutException
- * If this channel blocks transfer longer than timeout for
- * this stream.
- * @throws IOException
- * Includes any exception thrown by {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
- */
- public void transferToFully(FileChannel fileCh, long position, int count) throws IOException {
-
- while (count > 0) {
- /*
- * Ideally we should wait after transferTo returns 0. But because of
- * a bug in JRE on Linux (http://bugs.sun.com/view_bug.do?bug_id=5103988),
- * which throws an exception instead of returning 0, we wait for the
- * channel to be writable before writing to it. If you ever see
- * IOException with message "Resource temporarily unavailable"
- * thrown here, please let us know.
- * Once we move to JAVA SE 7, wait should be moved to correct place.
- */
- waitForWritable();
- final int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
-
- if (nTransfered == 0) {
- // check if end of file is reached.
- if (position >= fileCh.size()) {
- throw new EOFException("EOF Reached. file size is " + fileCh.size() + " and " + count
- + " more bytes left to be " + "transfered.");
- }
- // otherwise assume the socket is full.
- // waitForWritable(); // see comment above.
- } else if (nTransfered < 0) {
- throw new IOException("Unexpected return of " + nTransfered + " from transferTo()");
- } else {
- position += nTransfered;
- count -= nTransfered;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index 892e68d..c8ec180 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -30,13 +30,17 @@ trait ActorLogMessages {
override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
override def apply(x: Any):Unit = {
- if(log.isDebugEnabled) {
- log.debug(s"Received message ${x} from ${self.sender}.")
+ if (!log.isDebugEnabled) {
+ _receiveWithLogMessages(x)
}
- val start = System.nanoTime()
- _receiveWithLogMessages(x)
- val duration = (System.nanoTime() - start) / 1000000
- if(log.isDebugEnabled) {
+ else {
+ log.debug(s"Received message ${x} from ${self.sender}.")
+
+ val start = System.nanoTime()
+
+ _receiveWithLogMessages(x)
+
+ val duration = (System.nanoTime() - start) / 1000000
log.debug(s"Handled message ${x} in ${duration} ms from ${self.sender}.")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
index dff72bb..3a906b1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.network.serialization.{DataInputDeserializer,
import org.apache.flink.util.InstantiationUtil
class IOReadableWritableSerializer extends JSerializer {
- val INITIAL_BUFFER_SIZE = 8096
+ val INITIAL_BUFFER_SIZE = 256
override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
val in = new DataInputDeserializer(bytes, 0, bytes.length)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
index 0ac7ed6..be1ef17 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
@@ -25,7 +25,7 @@ import org.apache.flink.util.InstantiationUtil
import org.apache.hadoop.io.Writable
class WritableSerializer extends JSerializer {
- val INITIAL_BUFFER_SIZE = 8096
+ val INITIAL_BUFFER_SIZE = 256
override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
val in = new DataInputDeserializer(bytes, 0, bytes.length)