You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/03 20:50:21 UTC

[1/8] flink git commit: [FLINK-2426] [core] Cleanup/improve unmodifiable configuration.

Repository: flink
Updated Branches:
  refs/heads/master fab61a195 -> d0cd1c7d4


[FLINK-2426] [core] Cleanup/improve unmodifiable configuration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5bf2197b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5bf2197b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5bf2197b

Branch: refs/heads/master
Commit: 5bf2197b148f2b9857d9fcdb5859f37ee8997100
Parents: 8eb9cbf
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 13:52:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      | 27 ++++++--
 .../UnmodifiableConfiguration.java              | 70 +++++---------------
 .../flink/configuration/ConfigurationTest.java  | 21 +++++-
 .../UnmodifiableConfigurationTest.java          | 67 ++++++++++++++++---
 4 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index e9d7621..da42958 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -34,10 +34,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Lightweight configuration object which can store key/value pairs.
+ * Lightweight configuration object which stores key/value pairs.
  */
-@SuppressWarnings("EqualsBetweenInconvertibleTypes")
-public class Configuration extends ExecutionConfig.GlobalJobParameters implements IOReadableWritable, java.io.Serializable, Cloneable {
+public class Configuration extends ExecutionConfig.GlobalJobParameters 
+		implements IOReadableWritable, java.io.Serializable, Cloneable {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -54,11 +54,25 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement
 	
 
 	/** Stores the concrete key/value pairs of this configuration object. */
-	private final Map<String, Object> confData = new HashMap<String, Object>();
+	private final HashMap<String, Object> confData;
 	
 	// --------------------------------------------------------------------------------------------
-	
-	public Configuration() {}
+
+	/**
+	 * Creates a new empty configuration.
+	 */
+	public Configuration() {
+		this.confData = new HashMap<String, Object>();
+	}
+
+	/**
+	 * Creates a new configuration with the copy of the given configuration.
+	 * 
+	 * @param other The configuration to copy the entries from.
+	 */
+	public Configuration(Configuration other) {
+		this.confData = new HashMap<String, Object>(other.confData);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -362,6 +376,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement
 	 *        The default value which is returned in case there is no value associated with the given key.
 	 * @return the (default) value associated with the given key.
 	 */
+	@SuppressWarnings("EqualsBetweenInconvertibleTypes")
 	public byte[] getBytes(String key, byte[] defaultValue) {
 		
 		Object o = getRawValue(key);

http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index b436a53..5d92cf0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -18,67 +18,28 @@
 
 package org.apache.flink.configuration;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
- * Unmodifiable version of the Configuration class
+ * Unmodifiable version of the Configuration class.
  */
 public class UnmodifiableConfiguration extends Configuration {
-
-	/** The log object used for debugging. */
-	private static final Logger LOG = LoggerFactory.getLogger(UnmodifiableConfiguration.class);
-
+	
+	private static final long serialVersionUID = -8151292629158972280L;
+
+	/**
+	 * Creates a new UnmodifiableConfiguration, which holds a copy of the given configuration
+	 * that cannot be altered.
+	 * 
+	 * @param config The configuration with the original contents.
+	 */
 	public UnmodifiableConfiguration(Configuration config) {
-		super();
-		super.addAll(config);
+		super(config);
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  All setter methods must fail.
+	//  All mutating methods must fail
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public final void setClass(String key, Class<?> klazz) {
-		error();
-	}
-
-	@Override
-	public final void setString(String key, String value) {
-		error();
-	}
-
-	@Override
-	public final void setInteger(String key, int value) {
-		error();
-	}
-
-	@Override
-	public final void setLong(String key, long value) {
-		error();
-	}
-
-	@Override
-	public final void setBoolean(String key, boolean value) {
-		error();
-	}
-
-	@Override
-	public final void setFloat(String key, float value) {
-		error();
-	}
-
-	@Override
-	public final void setDouble(String key, double value) {
-		error();
-	}
-
-	@Override
-	public final void setBytes(String key, byte[] bytes) {
-		error();
-	}
-
-	@Override
 	public final void addAll(Configuration other) {
 		error();
 	}
@@ -89,12 +50,11 @@ public class UnmodifiableConfiguration extends Configuration {
 	}
 
 	@Override
-	<T> void setValueInternal(String key, T value){
+	final <T> void setValueInternal(String key, T value){
 		error();
 	}
 
-	private final void error(){
-		throw new UnsupportedOperationException("The unmodifiable configuration object doesn't allow set methods.");
+	private void error(){
+		throw new UnsupportedOperationException("The configuration is unmodifiable; its contents cannot be changed.");
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index e131892..33dde3d 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+
 import org.junit.Test;
 
 /**
@@ -175,4 +175,23 @@ public class ConfigurationTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testCopyConstructor() {
+		try {
+			final String key = "theKey";
+			
+			Configuration cfg1 = new Configuration();
+			cfg1.setString(key, "value");
+			
+			Configuration cfg2 = new Configuration(cfg1);
+			cfg2.setString(key, "another value");
+			
+			assertEquals("value", cfg1.getString(key, ""));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
index 302b72b..3ea52b8 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -18,29 +18,76 @@
 
 package org.apache.flink.configuration;
 
-
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.junit.Test;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * This class verifies that the Unmodifiable Configuration class overrides all setter methods in
  * Configuration.
  */
 public class UnmodifiableConfigurationTest {
-
-	private static Configuration pc = new Configuration();
-	private static UnmodifiableConfiguration unConf = new UnmodifiableConfiguration(pc);
-	private static Class clazz = unConf.getClass();
-
+	
 	@Test
-	public void testOverride() throws Exception{
-		for(Method m : clazz.getMethods()){
-			if(m.getName().indexOf("set") == 0 || m.getName().indexOf("add") == 0 ) {
-				assertEquals(clazz, m.getDeclaringClass());
+	public void testOverrideAddMethods() {
+		try {
+			Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class;
+			for (Method m : clazz.getMethods()) {
+				if (m.getName().startsWith("add")) {
+					assertEquals(clazz, m.getDeclaringClass());
+				}
 			}
 		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExceptionOnSet() {
+		try {
+			Map<Class<?>, Object> parameters = new HashMap<Class<?>, Object>();
+			parameters.put(byte[].class, new byte[0]);
+			parameters.put(Class.class, Object.class);
+			parameters.put(int.class, 0);
+			parameters.put(long.class, 0L);
+			parameters.put(float.class, 0.0f);
+			parameters.put(double.class, 0.0);
+			parameters.put(String.class, "");
+			parameters.put(boolean.class, false);
+					
+			Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class;
+			UnmodifiableConfiguration config = new UnmodifiableConfiguration(new Configuration());
+			
+			for (Method m : clazz.getMethods()) {
+				if (m.getName().startsWith("set")) {
+					
+					Class<?> parameterClass = m.getParameterTypes()[1];
+					Object parameter = parameters.get(parameterClass);
+					assertNotNull("method " + m + " not covered by test", parameter);
+					
+					try {
+						m.invoke(config, "key", parameter);
+						fail("should fail with an exception");
+					}
+					catch (InvocationTargetException e) {
+						assertTrue(e.getTargetException() instanceof UnsupportedOperationException);
+					}
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 }


[4/8] flink git commit: [FLINK-2425] [FLINK-2426] [runtime] Add an unmodifiable config and provide access to task manager configuration and hostname inside RuntimeEnvironment

Posted by se...@apache.org.
[FLINK-2425] [FLINK-2426] [runtime] Add an unmodifiable config and provide access to task manager configuration and hostname inside RuntimeEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb9cbf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb9cbf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb9cbf8

Branch: refs/heads/master
Commit: 8eb9cbf88111b0375860a3965e093a736455db79
Parents: 24dee42
Author: Sachin Goel <sa...@gmail.com>
Authored: Wed Jul 29 18:51:58 2015 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |   2 +-
 .../UnmodifiableConfiguration.java              | 100 +++++++++++++++++++
 .../UnmodifiableConfigurationTest.java          |  46 +++++++++
 .../flink/runtime/execution/Environment.java    |  10 ++
 .../runtime/taskmanager/RuntimeEnvironment.java |  22 +++-
 .../apache/flink/runtime/taskmanager/Task.java  |   9 +-
 .../taskmanager/RuntimeConfiguration.scala      |  23 +++++
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../operators/testutils/MockEnvironment.java    |  11 ++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  10 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   6 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  11 ++
 12 files changed, 246 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index c095b5f..e9d7621 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -472,7 +472,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement
 
 	// --------------------------------------------------------------------------------------------
 	
-	private <T> void setValueInternal(String key, T value) {
+	<T> void setValueInternal(String key, T value) {
 		if (key == null) {
 			throw new NullPointerException("Key must not be null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
new file mode 100644
index 0000000..b436a53
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unmodifiable version of the Configuration class
+ */
+public class UnmodifiableConfiguration extends Configuration {
+
+	/** The log object used for debugging. */
+	private static final Logger LOG = LoggerFactory.getLogger(UnmodifiableConfiguration.class);
+
+	public UnmodifiableConfiguration(Configuration config) {
+		super();
+		super.addAll(config);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  All setter methods must fail.
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public final void setClass(String key, Class<?> klazz) {
+		error();
+	}
+
+	@Override
+	public final void setString(String key, String value) {
+		error();
+	}
+
+	@Override
+	public final void setInteger(String key, int value) {
+		error();
+	}
+
+	@Override
+	public final void setLong(String key, long value) {
+		error();
+	}
+
+	@Override
+	public final void setBoolean(String key, boolean value) {
+		error();
+	}
+
+	@Override
+	public final void setFloat(String key, float value) {
+		error();
+	}
+
+	@Override
+	public final void setDouble(String key, double value) {
+		error();
+	}
+
+	@Override
+	public final void setBytes(String key, byte[] bytes) {
+		error();
+	}
+
+	@Override
+	public final void addAll(Configuration other) {
+		error();
+	}
+
+	@Override
+	public final void addAll(Configuration other, String prefix) {
+		error();
+	}
+
+	@Override
+	<T> void setValueInternal(String key, T value){
+		error();
+	}
+
+	private final void error(){
+		throw new UnsupportedOperationException("The unmodifiable configuration object doesn't allow set methods.");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
new file mode 100644
index 0000000..302b72b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ * This class verifies that the Unmodifiable Configuration class overrides all setter methods in
+ * Configuration.
+ */
+public class UnmodifiableConfigurationTest {
+
+	private static Configuration pc = new Configuration();
+	private static UnmodifiableConfiguration unConf = new UnmodifiableConfiguration(pc);
+	private static Class clazz = unConf.getClass();
+
+	@Test
+	public void testOverride() throws Exception{
+		for(Method m : clazz.getMethods()){
+			if(m.getName().indexOf("set") == 0 || m.getName().indexOf("add") == 0 ) {
+				assertEquals(clazz, m.getDeclaringClass());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index c561869..af29560 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -72,6 +72,16 @@ public interface Environment {
 	Configuration getTaskConfiguration();
 
 	/**
+	 * @return The task manager configuration
+	 */
+	Configuration getTaskManagerConfiguration();
+
+	/**
+	 * @return Hostname of the task manager
+	 */
+	String getHostname();
+
+	/**
 	 * Returns the job-wide configuration object that was attached to the JobGraph.
 	 *
 	 * @return The job-wide configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 5e276bf..c0dfee6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,6 +75,10 @@ public class RuntimeEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
+	private Configuration taskManagerConfiguration;
+
+	private String hostname;
+
 	// ------------------------------------------------------------------------
 
 	public RuntimeEnvironment(
@@ -96,7 +100,8 @@ public class RuntimeEnvironment implements Environment {
 			Map<String, Future<Path>> distCacheEntries,
 			ResultPartitionWriter[] writers,
 			InputGate[] inputGates,
-			ActorGateway jobManager) {
+			ActorGateway jobManager,
+			RuntimeConfiguration taskManagerConfig) {
 		
 		checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
 
@@ -119,9 +124,10 @@ public class RuntimeEnvironment implements Environment {
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
+		this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
+		this.hostname = taskManagerConfig.hostname();
 	}
 
-
 	// ------------------------------------------------------------------------
 	
 	@Override
@@ -168,7 +174,17 @@ public class RuntimeEnvironment implements Environment {
 	public Configuration getTaskConfiguration() {
 		return taskConfiguration;
 	}
-	
+
+	@Override
+	public Configuration getTaskManagerConfiguration(){
+		return taskManagerConfiguration;
+	}
+
+	@Override
+	public String getHostname(){
+		return hostname;
+	}
+
 	@Override
 	public ClassLoader getUserClassLoader() {
 		return userCodeClassLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c4f62fb..878a69a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -214,6 +214,9 @@ public class Task implements Runnable {
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
 
+	/** Access to task manager configuration and host names*/
+	private RuntimeConfiguration taskManagerConfig;
+
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
 	 * be undone in the case of a failing task deployment.</p>
@@ -227,7 +230,8 @@ public class Task implements Runnable {
 				ActorGateway jobManagerActor,
 				FiniteDuration actorAskTimeout,
 				LibraryCacheManager libraryCache,
-				FileCache fileCache)
+				FileCache fileCache,
+				RuntimeConfiguration taskManagerConfig)
 	{
 		checkArgument(tdd.getNumberOfSubtasks() > 0);
 		checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
@@ -258,6 +262,7 @@ public class Task implements Runnable {
 		this.libraryCache = checkNotNull(libraryCache);
 		this.fileCache = checkNotNull(fileCache);
 		this.network = checkNotNull(networkEnvironment);
+		this.taskManagerConfig = checkNotNull(taskManagerConfig);
 
 		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
 
@@ -510,7 +515,7 @@ public class Task implements Runnable {
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,
 					splitProvider, distributedCacheEntries,
-					writers, inputGates, jobManager);
+					writers, inputGates, jobManager, taskManagerConfig);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
new file mode 100644
index 0000000..ef0e705
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager
+
+import org.apache.flink.configuration.UnmodifiableConfiguration
+
+case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration)

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0ec1040..3ab271a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -36,7 +36,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
-import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
+import org.apache.flink.configuration._
 
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
@@ -892,7 +892,10 @@ class TaskManager(
         jobManagerGateway,
         config.timeout,
         libCache,
-        fileCache)
+        fileCache,
+        new RuntimeConfiguration(
+          self.path.toSerializationFormat,
+          new UnmodifiableConfiguration(config.configuration)))
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b9cb416..b71b01e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -192,6 +193,16 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
+	public Configuration getTaskManagerConfiguration(){
+		return new UnmodifiableConfiguration(new Configuration());
+	}
+
+	@Override
+	public String getHostname(){
+		return "localhost";
+	}
+
+	@Override
 	public int getNumberOfSubtasks() {
 		return 1;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 10a33c3..08f0094 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -148,16 +150,20 @@ public class TaskAsyncCallTest {
 				Collections.<BlobKey>emptyList(),
 				0);
 
+		ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
 		return new Task(tdd,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				networkEnvironment,
 				mock(BroadcastVariableManager.class),
-				DummyActorGateway.INSTANCE,
+				taskManagerGateway,
 				DummyActorGateway.INSTANCE,
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
-				mock(FileCache.class));
+				mock(FileCache.class),
+				new RuntimeConfiguration(
+						taskManagerGateway.path(),
+						new UnmodifiableConfiguration(new Configuration())));
 	}
 	
 	public static class CheckpointsInOrderInvokable extends AbstractInvokable

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index c6a8cb8..6d9df6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -725,7 +726,10 @@ public class TaskTest {
 				jobManagerGateway,
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
-				mock(FileCache.class));
+				mock(FileCache.class),
+				new RuntimeConfiguration(
+						taskManagerGateway.path(),
+						new UnmodifiableConfiguration(new Configuration())));
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index bbc64f1..44013ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -290,5 +291,15 @@ public class StreamMockEnvironment implements Environment {
 	@Override
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
 	}
+
+	@Override
+	public Configuration getTaskManagerConfiguration(){
+		return new UnmodifiableConfiguration(new Configuration());
+	}
+
+	@Override
+	public String getHostname(){
+		return "localhost";
+	}
 }
 


[8/8] flink git commit: [FLINK-2464] [tests] Change log level of BufferSpillerTest to "info" to let statements occur in CI logs.

Posted by se...@apache.org.
[FLINK-2464] [tests] Change log level of BufferSpillerTest to "info" to let statements occur in CI logs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0cd1c7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0cd1c7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0cd1c7d

Branch: refs/heads/master
Commit: d0cd1c7d428a2bde6b955599b247b209240f2a34
Parents: 2cff89e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 19:23:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 19:23:57 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/runtime/io/BufferSpillerTest.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0cd1c7d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index ca6df16..fbc19ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -183,7 +183,7 @@ public class BufferSpillerTest {
 
 	@Test
 	public void testSpillWhileReading() {
-		LOG.debug("Starting SpillWhileReading test");
+		LOG.info("Starting SpillWhileReading test");
 		
 		try {
 			final int sequences = 10;
@@ -359,7 +359,7 @@ public class BufferSpillerTest {
 					int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
 					int numChannels = nextSequence.numChannels;
 
-					LOG.debug("Reading sequence {}", consumedSequences);
+					LOG.info("Reading sequence {}", consumedSequences);
 					
 					// consume sequence
 					seq.open();


[2/8] flink git commit: [hotfix] Remove unused (and broken) TraversableOnceIterable.

Posted by se...@apache.org.
[hotfix] Remove unused (and broken) TraversableOnceIterable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24dee42e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24dee42e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24dee42e

Branch: refs/heads/master
Commit: 24dee42e0a68a38e87a5ec53e683c086a47698b3
Parents: b08e30a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 12:33:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/util/TraversableOnceIterable.java     | 42 --------------------
 1 file changed, 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24dee42e/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
deleted file mode 100644
index 73e3cd6..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import java.util.Iterator;
-
-public class TraversableOnceIterable<T> implements Iterable<T> {
-	
-	private final Iterator<T> iterator;
-
-	public TraversableOnceIterable(Iterator<T> iterator) {
-		if (iterator == null) {
-			throw new NullPointerException("The iterator must not be null.");
-		}
-		this.iterator = iterator;
-	}
-	
-	@Override
-	public Iterator<T> iterator() {
-		if (iterator != null) {
-			return iterator;
-		} else {
-			throw new TraversableOnceException();
-		}
-	}
-}


[3/8] flink git commit: [FLINK-2465] [streaming] SocketClientSink closes connection early

Posted by se...@apache.org.
[FLINK-2465] [streaming] SocketClientSink closes connection early

This closes #972


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08e30ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08e30ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08e30ad

Branch: refs/heads/master
Commit: b08e30ad9a39ed9885f4daa05a08d13eb22ebb8c
Parents: 0693c92
Author: ffbin <86...@qq.com>
Authored: Mon Aug 3 15:50:26 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/functions/sink/SocketClientSink.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b08e30ad/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index da8fd7f..adffe5e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -88,7 +88,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 */
 	private void closeConnection(){
 		try {
-			dataOutputStream.flush();
+			dataOutputStream.close();
 			client.close();
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing connection with socket server at "


[7/8] flink git commit: [FLINK-2473] [core] Add a timeout to akka actorsystem shutdown.

Posted by se...@apache.org.
[FLINK-2473] [core] Add a timeout to akka actorsystem shutdown.

This works around a bug in akka where the "awaitTermination()" call freezes indefinitely.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cff89ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cff89ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cff89ed

Branch: refs/heads/master
Commit: 2cff89ed93bbacf8d054399514c1ca4fb8b24730
Parents: c3ef61d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 16:28:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:49:48 2015 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/client/program/Client.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cff89ed/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index d2022bb..78c82f6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -26,6 +26,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -441,7 +442,9 @@ public class Client {
 		}
 		finally {
 			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+			
+			// wait at most for 30 seconds, to work around an occasional akka problem
+			actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
 		}
 	}
 


[6/8] flink git commit: [FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment

Posted by se...@apache.org.
[FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ef61de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ef61de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ef61de

Branch: refs/heads/master
Commit: c3ef61de934a9c447ec442449c527ce719ee46c6
Parents: 5bf2197
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 17:41:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:49:45 2015 +0200

----------------------------------------------------------------------
 .../runtime/taskmanager/RuntimeEnvironment.java | 12 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  8 +--
 .../taskmanager/TaskManagerRuntimeInfo.java     | 61 ++++++++++++++++++++
 .../taskmanager/RuntimeConfiguration.scala      | 23 --------
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  6 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  8 +--
 7 files changed, 80 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c0dfee6..cd6dbd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
-	private Configuration taskManagerConfiguration;
+	private final Configuration taskManagerConfiguration;
 
-	private String hostname;
+	private final String hostname;
 
 	// ------------------------------------------------------------------------
 
@@ -95,13 +95,13 @@ public class RuntimeEnvironment implements Environment {
 			MemoryManager memManager,
 			IOManager ioManager,
 			BroadcastVariableManager bcVarManager,
-								AccumulatorRegistry accumulatorRegistry,
+			AccumulatorRegistry accumulatorRegistry,
 			InputSplitProvider splitProvider,
 			Map<String, Future<Path>> distCacheEntries,
 			ResultPartitionWriter[] writers,
 			InputGate[] inputGates,
 			ActorGateway jobManager,
-			RuntimeConfiguration taskManagerConfig) {
+			TaskManagerRuntimeInfo taskManagerInfo) {
 		
 		checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
 
@@ -124,8 +124,8 @@ public class RuntimeEnvironment implements Environment {
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
-		this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
-		this.hostname = taskManagerConfig.hostname();
+		this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
+		this.hostname = taskManagerInfo.getHostname();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 878a69a..36de90a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -145,6 +145,9 @@ public class Task implements Runnable {
 	/** The name of the class that holds the invokable code */
 	private final String nameOfInvokableClass;
 
+	/** Access to task manager configuration and host names*/
+	private final TaskManagerRuntimeInfo taskManagerConfig;
+	
 	/** The memory manager to be used by this task */
 	private final MemoryManager memoryManager;
 
@@ -214,9 +217,6 @@ public class Task implements Runnable {
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
 
-	/** Access to task manager configuration and host names*/
-	private RuntimeConfiguration taskManagerConfig;
-
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
 	 * be undone in the case of a failing task deployment.</p>
@@ -231,7 +231,7 @@ public class Task implements Runnable {
 				FiniteDuration actorAskTimeout,
 				LibraryCacheManager libraryCache,
 				FileCache fileCache,
-				RuntimeConfiguration taskManagerConfig)
+				TaskManagerRuntimeInfo taskManagerConfig)
 	{
 		checkArgument(tdd.getNumberOfSubtasks() > 0);
 		checkArgument(tdd.getIndexInSubtaskGroup() >= 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..8d06f10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Encapsulation of TaskManager runtime information, like hostname and configuration.
+ */
+public class TaskManagerRuntimeInfo implements java.io.Serializable {
+
+	private static final long serialVersionUID = 5598219619760274072L;
+	
+	/** host name of the interface that the TaskManager uses to communicate */
+	private final String hostname;
+
+	/** configuration that the TaskManager was started with */
+	private final Configuration configuration;
+
+	/**
+	 * Creates a runtime info.
+	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
+	 * @param configuration The configuration that the TaskManager was started with.
+	 */
+	public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
+		this.hostname = hostname;
+		this.configuration = configuration;
+	}
+
+	/**
+	 * Gets host name of the interface that the TaskManager uses to communicate.
+	 * @return The host name of the interface that the TaskManager uses to communicate.
+	 */
+	public String getHostname() {
+		return hostname;
+	}
+
+	/**
+	 * Gets the configuration that the TaskManager was started with.
+	 * @return The configuration that the TaskManager was started with.
+	 */
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
deleted file mode 100644
index ef0e705..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.configuration.UnmodifiableConfiguration
-
-case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration)

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3ab271a..cc8b8ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -173,6 +173,10 @@ class TaskManager(
 
   private val currentRegistrationSessionID: UUID = UUID.randomUUID()
 
+  private val runtimeInfo = new TaskManagerRuntimeInfo(
+       connectionInfo.getHostname(),
+       new UnmodifiableConfiguration(config.configuration))
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -893,9 +897,7 @@ class TaskManager(
         config.timeout,
         libCache,
         fileCache,
-        new RuntimeConfiguration(
-          self.path.toSerializationFormat,
-          new UnmodifiableConfiguration(config.configuration)))
+        runtimeInfo)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 08f0094..a7d8d8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -129,7 +128,6 @@ public class TaskAsyncCallTest {
 	}
 	
 	private static Task createTask() {
-		
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 		
@@ -161,9 +159,7 @@ public class TaskAsyncCallTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new RuntimeConfiguration(
-						taskManagerGateway.path(),
-						new UnmodifiableConfiguration(new Configuration())));
+				new TaskManagerRuntimeInfo("localhost", new Configuration()));
 	}
 	
 	public static class CheckpointsInOrderInvokable extends AbstractInvokable

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6d9df6d..0cba533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
@@ -727,9 +727,7 @@ public class TaskTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new RuntimeConfiguration(
-						taskManagerGateway.path(),
-						new UnmodifiableConfiguration(new Configuration())));
+				new TaskManagerRuntimeInfo("localhost", new Configuration()));
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {


[5/8] flink git commit: [FLINK-2322] [streaming] Close file streams to release resources early.

Posted by se...@apache.org.
[FLINK-2322] [streaming] Close file streams to release resources early.

This closes #928


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0693c92b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0693c92b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0693c92b

Branch: refs/heads/master
Commit: 0693c92bdda655e1fbce232038909a7c2a385a22
Parents: fab61a1
Author: tedyu <yu...@gmail.com>
Authored: Tue Jul 21 11:13:17 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/sca/UdfAnalyzerUtils.java     | 19 ++++++++++++++++---
 .../flink/api/java/utils/ParameterTool.java      |  4 +++-
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index df1e421..dbfd29e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -32,6 +32,7 @@ import org.objectweb.asm.tree.analysis.BasicValue;
 import org.objectweb.asm.tree.analysis.Value;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -114,12 +115,14 @@ public final class UdfAnalyzerUtils {
 	 */
 	@SuppressWarnings("unchecked")
 	public static Object[] findMethodNode(String internalClassName, String name, String desc) {
+		InputStream stream = null;
 		try {
 			// iterate through hierarchy and search for method node /
 			// class that really implements the method
 			while (internalClassName != null) {
-				ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader()
-						.getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+				stream = Thread.currentThread().getContextClassLoader()
+						.getResourceAsStream(internalClassName.replace('.', '/') + ".class");
+				ClassReader cr = new ClassReader(stream);
 				final ClassNode cn = new ClassNode();
 				cr.accept(cn, 0);
 				for (MethodNode mn : (List<MethodNode>) cn.methods) {
@@ -129,9 +132,19 @@ public final class UdfAnalyzerUtils {
 				}
 				internalClassName = cr.getSuperName();
 			}
-		} catch (IOException e) {
+		}
+		catch (IOException e) {
 			throw new IllegalStateException("Method '" + name + "' could not be found", e);
 		}
+		finally {
+			if (stream != null) {
+				try {
+					stream.close();
+				} catch (IOException e) { 
+					// best effort cleanup
+				}
+			}
+		}
 		throw new IllegalStateException("Method '" + name + "' could not be found");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 317dce4..b60a559 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -130,7 +130,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 			throw new FileNotFoundException("Properties file "+path+" does not exist");
 		}
 		Properties props = new Properties();
-		props.load(new FileInputStream(propertiesFile));
+		FileInputStream fis = new FileInputStream(propertiesFile);
+		props.load(fis);
+		fis.close();
 		return fromMap((Map)props);
 	}