You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/03 20:50:21 UTC
[1/8] flink git commit: [FLINK-2426] [core] Cleanup/improve
unmodifiable configuration.
Repository: flink
Updated Branches:
refs/heads/master fab61a195 -> d0cd1c7d4
[FLINK-2426] [core] Cleanup/improve unmodifiable configuration.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5bf2197b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5bf2197b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5bf2197b
Branch: refs/heads/master
Commit: 5bf2197b148f2b9857d9fcdb5859f37ee8997100
Parents: 8eb9cbf
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 13:52:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/Configuration.java | 27 ++++++--
.../UnmodifiableConfiguration.java | 70 +++++---------------
.../flink/configuration/ConfigurationTest.java | 21 +++++-
.../UnmodifiableConfigurationTest.java | 67 ++++++++++++++++---
4 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index e9d7621..da42958 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -34,10 +34,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Lightweight configuration object which can store key/value pairs.
+ * Lightweight configuration object which stores key/value pairs.
*/
-@SuppressWarnings("EqualsBetweenInconvertibleTypes")
-public class Configuration extends ExecutionConfig.GlobalJobParameters implements IOReadableWritable, java.io.Serializable, Cloneable {
+public class Configuration extends ExecutionConfig.GlobalJobParameters
+ implements IOReadableWritable, java.io.Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@@ -54,11 +54,25 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement
/** Stores the concrete key/value pairs of this configuration object. */
- private final Map<String, Object> confData = new HashMap<String, Object>();
+ private final HashMap<String, Object> confData;
// --------------------------------------------------------------------------------------------
-
- public Configuration() {}
+
+ /**
+ * Creates a new empty configuration.
+ */
+ public Configuration() {
+ this.confData = new HashMap<String, Object>();
+ }
+
+ /**
+ * Creates a new configuration with the copy of the given configuration.
+ *
+ * @param other The configuration to copy the entries from.
+ */
+ public Configuration(Configuration other) {
+ this.confData = new HashMap<String, Object>(other.confData);
+ }
// --------------------------------------------------------------------------------------------
@@ -362,6 +376,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement
* The default value which is returned in case there is no value associated with the given key.
* @return the (default) value associated with the given key.
*/
+ @SuppressWarnings("EqualsBetweenInconvertibleTypes")
public byte[] getBytes(String key, byte[] defaultValue) {
Object o = getRawValue(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index b436a53..5d92cf0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -18,67 +18,28 @@
package org.apache.flink.configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
- * Unmodifiable version of the Configuration class
+ * Unmodifiable version of the Configuration class.
*/
public class UnmodifiableConfiguration extends Configuration {
-
- /** The log object used for debugging. */
- private static final Logger LOG = LoggerFactory.getLogger(UnmodifiableConfiguration.class);
-
+
+ private static final long serialVersionUID = -8151292629158972280L;
+
+ /**
+ * Creates a new UnmodifiableConfiguration, which holds a copy of the given configuration
+ * that cannot be altered.
+ *
+ * @param config The configuration with the original contents.
+ */
public UnmodifiableConfiguration(Configuration config) {
- super();
- super.addAll(config);
+ super(config);
}
// --------------------------------------------------------------------------------------------
- // All setter methods must fail.
+ // All mutating methods must fail
// --------------------------------------------------------------------------------------------
@Override
- public final void setClass(String key, Class<?> klazz) {
- error();
- }
-
- @Override
- public final void setString(String key, String value) {
- error();
- }
-
- @Override
- public final void setInteger(String key, int value) {
- error();
- }
-
- @Override
- public final void setLong(String key, long value) {
- error();
- }
-
- @Override
- public final void setBoolean(String key, boolean value) {
- error();
- }
-
- @Override
- public final void setFloat(String key, float value) {
- error();
- }
-
- @Override
- public final void setDouble(String key, double value) {
- error();
- }
-
- @Override
- public final void setBytes(String key, byte[] bytes) {
- error();
- }
-
- @Override
public final void addAll(Configuration other) {
error();
}
@@ -89,12 +50,11 @@ public class UnmodifiableConfiguration extends Configuration {
}
@Override
- <T> void setValueInternal(String key, T value){
+ final <T> void setValueInternal(String key, T value){
error();
}
- private final void error(){
- throw new UnsupportedOperationException("The unmodifiable configuration object doesn't allow set methods.");
+ private void error(){
+ throw new UnsupportedOperationException("The configuration is unmodifiable; its contents cannot be changed.");
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index e131892..33dde3d 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
+
import org.junit.Test;
/**
@@ -175,4 +175,23 @@ public class ConfigurationTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testCopyConstructor() {
+ try {
+ final String key = "theKey";
+
+ Configuration cfg1 = new Configuration();
+ cfg1.setString(key, "value");
+
+ Configuration cfg2 = new Configuration(cfg1);
+ cfg2.setString(key, "another value");
+
+ assertEquals("value", cfg1.getString(key, ""));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5bf2197b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
index 302b72b..3ea52b8 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -18,29 +18,76 @@
package org.apache.flink.configuration;
-
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.junit.Test;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
/**
* This class verifies that the Unmodifiable Configuration class overrides all setter methods in
* Configuration.
*/
public class UnmodifiableConfigurationTest {
-
- private static Configuration pc = new Configuration();
- private static UnmodifiableConfiguration unConf = new UnmodifiableConfiguration(pc);
- private static Class clazz = unConf.getClass();
-
+
@Test
- public void testOverride() throws Exception{
- for(Method m : clazz.getMethods()){
- if(m.getName().indexOf("set") == 0 || m.getName().indexOf("add") == 0 ) {
- assertEquals(clazz, m.getDeclaringClass());
+ public void testOverrideAddMethods() {
+ try {
+ Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class;
+ for (Method m : clazz.getMethods()) {
+ if (m.getName().startsWith("add")) {
+ assertEquals(clazz, m.getDeclaringClass());
+ }
}
}
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testExceptionOnSet() {
+ try {
+ Map<Class<?>, Object> parameters = new HashMap<Class<?>, Object>();
+ parameters.put(byte[].class, new byte[0]);
+ parameters.put(Class.class, Object.class);
+ parameters.put(int.class, 0);
+ parameters.put(long.class, 0L);
+ parameters.put(float.class, 0.0f);
+ parameters.put(double.class, 0.0);
+ parameters.put(String.class, "");
+ parameters.put(boolean.class, false);
+
+ Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class;
+ UnmodifiableConfiguration config = new UnmodifiableConfiguration(new Configuration());
+
+ for (Method m : clazz.getMethods()) {
+ if (m.getName().startsWith("set")) {
+
+ Class<?> parameterClass = m.getParameterTypes()[1];
+ Object parameter = parameters.get(parameterClass);
+ assertNotNull("method " + m + " not covered by test", parameter);
+
+ try {
+ m.invoke(config, "key", parameter);
+ fail("should fail with an exception");
+ }
+ catch (InvocationTargetException e) {
+ assertTrue(e.getTargetException() instanceof UnsupportedOperationException);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
}
[4/8] flink git commit: [FLINK-2425] [FLINK-2426] [runtime] Add an
unmodifiable config and provide access to task manager configuration and
hostname inside RuntimeEnvironment
Posted by se...@apache.org.
[FLINK-2425] [FLINK-2426] [runtime] Add an unmodifiable config and provide access to task manager configuration and hostname inside RuntimeEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb9cbf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb9cbf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb9cbf8
Branch: refs/heads/master
Commit: 8eb9cbf88111b0375860a3965e093a736455db79
Parents: 24dee42
Author: Sachin Goel <sa...@gmail.com>
Authored: Wed Jul 29 18:51:58 2015 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/Configuration.java | 2 +-
.../UnmodifiableConfiguration.java | 100 +++++++++++++++++++
.../UnmodifiableConfigurationTest.java | 46 +++++++++
.../flink/runtime/execution/Environment.java | 10 ++
.../runtime/taskmanager/RuntimeEnvironment.java | 22 +++-
.../apache/flink/runtime/taskmanager/Task.java | 9 +-
.../taskmanager/RuntimeConfiguration.scala | 23 +++++
.../flink/runtime/taskmanager/TaskManager.scala | 7 +-
.../operators/testutils/MockEnvironment.java | 11 ++
.../runtime/taskmanager/TaskAsyncCallTest.java | 10 +-
.../flink/runtime/taskmanager/TaskTest.java | 6 +-
.../runtime/tasks/StreamMockEnvironment.java | 11 ++
12 files changed, 246 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index c095b5f..e9d7621 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -472,7 +472,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement
// --------------------------------------------------------------------------------------------
- private <T> void setValueInternal(String key, T value) {
+ <T> void setValueInternal(String key, T value) {
if (key == null) {
throw new NullPointerException("Key must not be null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
new file mode 100644
index 0000000..b436a53
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unmodifiable version of the Configuration class
+ */
+public class UnmodifiableConfiguration extends Configuration {
+
+ /** The log object used for debugging. */
+ private static final Logger LOG = LoggerFactory.getLogger(UnmodifiableConfiguration.class);
+
+ public UnmodifiableConfiguration(Configuration config) {
+ super();
+ super.addAll(config);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // All setter methods must fail.
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public final void setClass(String key, Class<?> klazz) {
+ error();
+ }
+
+ @Override
+ public final void setString(String key, String value) {
+ error();
+ }
+
+ @Override
+ public final void setInteger(String key, int value) {
+ error();
+ }
+
+ @Override
+ public final void setLong(String key, long value) {
+ error();
+ }
+
+ @Override
+ public final void setBoolean(String key, boolean value) {
+ error();
+ }
+
+ @Override
+ public final void setFloat(String key, float value) {
+ error();
+ }
+
+ @Override
+ public final void setDouble(String key, double value) {
+ error();
+ }
+
+ @Override
+ public final void setBytes(String key, byte[] bytes) {
+ error();
+ }
+
+ @Override
+ public final void addAll(Configuration other) {
+ error();
+ }
+
+ @Override
+ public final void addAll(Configuration other, String prefix) {
+ error();
+ }
+
+ @Override
+ <T> void setValueInternal(String key, T value){
+ error();
+ }
+
+ private final void error(){
+ throw new UnsupportedOperationException("The unmodifiable configuration object doesn't allow set methods.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
new file mode 100644
index 0000000..302b72b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ * This class verifies that the Unmodifiable Configuration class overrides all setter methods in
+ * Configuration.
+ */
+public class UnmodifiableConfigurationTest {
+
+ private static Configuration pc = new Configuration();
+ private static UnmodifiableConfiguration unConf = new UnmodifiableConfiguration(pc);
+ private static Class clazz = unConf.getClass();
+
+ @Test
+ public void testOverride() throws Exception{
+ for(Method m : clazz.getMethods()){
+ if(m.getName().indexOf("set") == 0 || m.getName().indexOf("add") == 0 ) {
+ assertEquals(clazz, m.getDeclaringClass());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index c561869..af29560 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -72,6 +72,16 @@ public interface Environment {
Configuration getTaskConfiguration();
/**
+ * @return The task manager configuration
+ */
+ Configuration getTaskManagerConfiguration();
+
+ /**
+ * @return Hostname of the task manager
+ */
+ String getHostname();
+
+ /**
* Returns the job-wide configuration object that was attached to the JobGraph.
*
* @return The job-wide configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 5e276bf..c0dfee6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,6 +75,10 @@ public class RuntimeEnvironment implements Environment {
private final AccumulatorRegistry accumulatorRegistry;
+ private Configuration taskManagerConfiguration;
+
+ private String hostname;
+
// ------------------------------------------------------------------------
public RuntimeEnvironment(
@@ -96,7 +100,8 @@ public class RuntimeEnvironment implements Environment {
Map<String, Future<Path>> distCacheEntries,
ResultPartitionWriter[] writers,
InputGate[] inputGates,
- ActorGateway jobManager) {
+ ActorGateway jobManager,
+ RuntimeConfiguration taskManagerConfig) {
checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
@@ -119,9 +124,10 @@ public class RuntimeEnvironment implements Environment {
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
+ this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
+ this.hostname = taskManagerConfig.hostname();
}
-
// ------------------------------------------------------------------------
@Override
@@ -168,7 +174,17 @@ public class RuntimeEnvironment implements Environment {
public Configuration getTaskConfiguration() {
return taskConfiguration;
}
-
+
+ @Override
+ public Configuration getTaskManagerConfiguration(){
+ return taskManagerConfiguration;
+ }
+
+ @Override
+ public String getHostname(){
+ return hostname;
+ }
+
@Override
public ClassLoader getUserClassLoader() {
return userCodeClassLoader;
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c4f62fb..878a69a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -214,6 +214,9 @@ public class Task implements Runnable {
* initialization, to be memory friendly */
private volatile SerializedValue<StateHandle<?>> operatorState;
+ /** Access to task manager configuration and host names*/
+ private RuntimeConfiguration taskManagerConfig;
+
/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
@@ -227,7 +230,8 @@ public class Task implements Runnable {
ActorGateway jobManagerActor,
FiniteDuration actorAskTimeout,
LibraryCacheManager libraryCache,
- FileCache fileCache)
+ FileCache fileCache,
+ RuntimeConfiguration taskManagerConfig)
{
checkArgument(tdd.getNumberOfSubtasks() > 0);
checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
@@ -258,6 +262,7 @@ public class Task implements Runnable {
this.libraryCache = checkNotNull(libraryCache);
this.fileCache = checkNotNull(fileCache);
this.network = checkNotNull(networkEnvironment);
+ this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
@@ -510,7 +515,7 @@ public class Task implements Runnable {
userCodeClassLoader, memoryManager, ioManager,
broadcastVariableManager, accumulatorRegistry,
splitProvider, distributedCacheEntries,
- writers, inputGates, jobManager);
+ writers, inputGates, jobManager, taskManagerConfig);
// let the task code create its readers and writers
invokable.setEnvironment(env);
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
new file mode 100644
index 0000000..ef0e705
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager
+
+import org.apache.flink.configuration.UnmodifiableConfiguration
+
+case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration)
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0ec1040..3ab271a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -36,7 +36,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
-import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
+import org.apache.flink.configuration._
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
@@ -892,7 +892,10 @@ class TaskManager(
jobManagerGateway,
config.timeout,
libCache,
- fileCache)
+ fileCache,
+ new RuntimeConfiguration(
+ self.path.toSerializationFormat,
+ new UnmodifiableConfiguration(config.configuration)))
log.info(s"Received task ${task.getTaskNameWithSubtasks}")
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b9cb416..b71b01e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -192,6 +193,16 @@ public class MockEnvironment implements Environment {
}
@Override
+ public Configuration getTaskManagerConfiguration(){
+ return new UnmodifiableConfiguration(new Configuration());
+ }
+
+ @Override
+ public String getHostname(){
+ return "localhost";
+ }
+
+ @Override
public int getNumberOfSubtasks() {
return 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 10a33c3..08f0094 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -148,16 +150,20 @@ public class TaskAsyncCallTest {
Collections.<BlobKey>emptyList(),
0);
+ ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
return new Task(tdd,
mock(MemoryManager.class),
mock(IOManager.class),
networkEnvironment,
mock(BroadcastVariableManager.class),
- DummyActorGateway.INSTANCE,
+ taskManagerGateway,
DummyActorGateway.INSTANCE,
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
- mock(FileCache.class));
+ mock(FileCache.class),
+ new RuntimeConfiguration(
+ taskManagerGateway.path(),
+ new UnmodifiableConfiguration(new Configuration())));
}
public static class CheckpointsInOrderInvokable extends AbstractInvokable
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index c6a8cb8..6d9df6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import com.google.common.collect.Maps;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -725,7 +726,10 @@ public class TaskTest {
jobManagerGateway,
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
- mock(FileCache.class));
+ mock(FileCache.class),
+ new RuntimeConfiguration(
+ taskManagerGateway.path(),
+ new UnmodifiableConfiguration(new Configuration())));
}
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index bbc64f1..44013ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -290,5 +291,15 @@ public class StreamMockEnvironment implements Environment {
@Override
public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
}
+
+ @Override
+ public Configuration getTaskManagerConfiguration(){
+ return new UnmodifiableConfiguration(new Configuration());
+ }
+
+ @Override
+ public String getHostname(){
+ return "localhost";
+ }
}
[8/8] flink git commit: [FLINK-2464] [tests] Change log level of
BufferSpillerTest to "info" to let statements occur in CI logs.
Posted by se...@apache.org.
[FLINK-2464] [tests] Change log level of BufferSpillerTest to "info" to let statements occur in CI logs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0cd1c7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0cd1c7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0cd1c7d
Branch: refs/heads/master
Commit: d0cd1c7d428a2bde6b955599b247b209240f2a34
Parents: 2cff89e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 19:23:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 19:23:57 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/streaming/runtime/io/BufferSpillerTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d0cd1c7d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index ca6df16..fbc19ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -183,7 +183,7 @@ public class BufferSpillerTest {
@Test
public void testSpillWhileReading() {
- LOG.debug("Starting SpillWhileReading test");
+ LOG.info("Starting SpillWhileReading test");
try {
final int sequences = 10;
@@ -359,7 +359,7 @@ public class BufferSpillerTest {
int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
int numChannels = nextSequence.numChannels;
- LOG.debug("Reading sequence {}", consumedSequences);
+ LOG.info("Reading sequence {}", consumedSequences);
// consume sequence
seq.open();
[2/8] flink git commit: [hotfix] Remove unused (and broken)
TraversableOnceIterable.
Posted by se...@apache.org.
[hotfix] Remove unused (and broken) TraversableOnceIterable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24dee42e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24dee42e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24dee42e
Branch: refs/heads/master
Commit: 24dee42e0a68a38e87a5ec53e683c086a47698b3
Parents: b08e30a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 12:33:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200
----------------------------------------------------------------------
.../flink/util/TraversableOnceIterable.java | 42 --------------------
1 file changed, 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24dee42e/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
deleted file mode 100644
index 73e3cd6..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import java.util.Iterator;
-
-public class TraversableOnceIterable<T> implements Iterable<T> {
-
- private final Iterator<T> iterator;
-
- public TraversableOnceIterable(Iterator<T> iterator) {
- if (iterator == null) {
- throw new NullPointerException("The iterator must not be null.");
- }
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<T> iterator() {
- if (iterator != null) {
- return iterator;
- } else {
- throw new TraversableOnceException();
- }
- }
-}
[3/8] flink git commit: [FLINK-2465] [streaming] SocketClientSink
closes connection early
Posted by se...@apache.org.
[FLINK-2465] [streaming] SocketClientSink closes connection early
This closes #972
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08e30ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08e30ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08e30ad
Branch: refs/heads/master
Commit: b08e30ad9a39ed9885f4daa05a08d13eb22ebb8c
Parents: 0693c92
Author: ffbin <86...@qq.com>
Authored: Mon Aug 3 15:50:26 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/functions/sink/SocketClientSink.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b08e30ad/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index da8fd7f..adffe5e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -88,7 +88,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
*/
private void closeConnection(){
try {
- dataOutputStream.flush();
+ dataOutputStream.close();
client.close();
} catch (IOException e) {
throw new RuntimeException("Error while closing connection with socket server at "
[7/8] flink git commit: [FLINK-2473] [core] Add a timeout to akka
actorsystem shutdown.
Posted by se...@apache.org.
[FLINK-2473] [core] Add a timeout to akka actorsystem shutdown.
This works around a bug in akka where the "awaitTermination()" call freezes indefinitely.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cff89ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cff89ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cff89ed
Branch: refs/heads/master
Commit: 2cff89ed93bbacf8d054399514c1ca4fb8b24730
Parents: c3ef61d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 16:28:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:49:48 2015 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/client/program/Client.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cff89ed/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index d2022bb..78c82f6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -26,6 +26,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
@@ -441,7 +442,9 @@ public class Client {
}
finally {
actorSystem.shutdown();
- actorSystem.awaitTermination();
+
+ // wait at most for 30 seconds, to work around an occasional akka problem
+ actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
}
}
[6/8] flink git commit: [FLINK-2425] [runtime] Cleanup code for
forwarding config and hostname into TaskManager's RuntimeEnvironment
Posted by se...@apache.org.
[FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ef61de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ef61de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ef61de
Branch: refs/heads/master
Commit: c3ef61de934a9c447ec442449c527ce719ee46c6
Parents: 5bf2197
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 17:41:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:49:45 2015 +0200
----------------------------------------------------------------------
.../runtime/taskmanager/RuntimeEnvironment.java | 12 ++--
.../apache/flink/runtime/taskmanager/Task.java | 8 +--
.../taskmanager/TaskManagerRuntimeInfo.java | 61 ++++++++++++++++++++
.../taskmanager/RuntimeConfiguration.scala | 23 --------
.../flink/runtime/taskmanager/TaskManager.scala | 8 ++-
.../runtime/taskmanager/TaskAsyncCallTest.java | 6 +-
.../flink/runtime/taskmanager/TaskTest.java | 8 +--
7 files changed, 80 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c0dfee6..cd6dbd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment {
private final AccumulatorRegistry accumulatorRegistry;
- private Configuration taskManagerConfiguration;
+ private final Configuration taskManagerConfiguration;
- private String hostname;
+ private final String hostname;
// ------------------------------------------------------------------------
@@ -95,13 +95,13 @@ public class RuntimeEnvironment implements Environment {
MemoryManager memManager,
IOManager ioManager,
BroadcastVariableManager bcVarManager,
- AccumulatorRegistry accumulatorRegistry,
+ AccumulatorRegistry accumulatorRegistry,
InputSplitProvider splitProvider,
Map<String, Future<Path>> distCacheEntries,
ResultPartitionWriter[] writers,
InputGate[] inputGates,
ActorGateway jobManager,
- RuntimeConfiguration taskManagerConfig) {
+ TaskManagerRuntimeInfo taskManagerInfo) {
checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
@@ -124,8 +124,8 @@ public class RuntimeEnvironment implements Environment {
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
- this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
- this.hostname = taskManagerConfig.hostname();
+ this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
+ this.hostname = taskManagerInfo.getHostname();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 878a69a..36de90a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -145,6 +145,9 @@ public class Task implements Runnable {
/** The name of the class that holds the invokable code */
private final String nameOfInvokableClass;
+ /** Access to task manager configuration and host names*/
+ private final TaskManagerRuntimeInfo taskManagerConfig;
+
/** The memory manager to be used by this task */
private final MemoryManager memoryManager;
@@ -214,9 +217,6 @@ public class Task implements Runnable {
* initialization, to be memory friendly */
private volatile SerializedValue<StateHandle<?>> operatorState;
- /** Access to task manager configuration and host names*/
- private RuntimeConfiguration taskManagerConfig;
-
/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
@@ -231,7 +231,7 @@ public class Task implements Runnable {
FiniteDuration actorAskTimeout,
LibraryCacheManager libraryCache,
FileCache fileCache,
- RuntimeConfiguration taskManagerConfig)
+ TaskManagerRuntimeInfo taskManagerConfig)
{
checkArgument(tdd.getNumberOfSubtasks() > 0);
checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..8d06f10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Encapsulation of TaskManager runtime information, like hostname and configuration.
+ */
+public class TaskManagerRuntimeInfo implements java.io.Serializable {
+
+ private static final long serialVersionUID = 5598219619760274072L;
+
+ /** host name of the interface that the TaskManager uses to communicate */
+ private final String hostname;
+
+ /** configuration that the TaskManager was started with */
+ private final Configuration configuration;
+
+ /**
+ * Creates a runtime info.
+ * @param hostname The host name of the interface that the TaskManager uses to communicate.
+ * @param configuration The configuration that the TaskManager was started with.
+ */
+ public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
+ this.hostname = hostname;
+ this.configuration = configuration;
+ }
+
+ /**
+ * Gets host name of the interface that the TaskManager uses to communicate.
+ * @return The host name of the interface that the TaskManager uses to communicate.
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * Gets the configuration that the TaskManager was started with.
+ * @return The configuration that the TaskManager was started with.
+ */
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
deleted file mode 100644
index ef0e705..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.configuration.UnmodifiableConfiguration
-
-case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration)
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3ab271a..cc8b8ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -173,6 +173,10 @@ class TaskManager(
private val currentRegistrationSessionID: UUID = UUID.randomUUID()
+ private val runtimeInfo = new TaskManagerRuntimeInfo(
+ connectionInfo.getHostname(),
+ new UnmodifiableConfiguration(config.configuration))
+
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
@@ -893,9 +897,7 @@ class TaskManager(
config.timeout,
libCache,
fileCache,
- new RuntimeConfiguration(
- self.path.toSerializationFormat,
- new UnmodifiableConfiguration(config.configuration)))
+ runtimeInfo)
log.info(s"Received task ${task.getTaskNameWithSubtasks}")
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 08f0094..a7d8d8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -129,7 +128,6 @@ public class TaskAsyncCallTest {
}
private static Task createTask() {
-
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -161,9 +159,7 @@ public class TaskAsyncCallTest {
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
- new RuntimeConfiguration(
- taskManagerGateway.path(),
- new UnmodifiableConfiguration(new Configuration())));
+ new TaskManagerRuntimeInfo("localhost", new Configuration()));
}
public static class CheckpointsInOrderInvokable extends AbstractInvokable
http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6d9df6d..0cba533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.taskmanager;
import com.google.common.collect.Maps;
+
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
import scala.concurrent.duration.FiniteDuration;
import java.lang.reflect.Field;
@@ -727,9 +727,7 @@ public class TaskTest {
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
- new RuntimeConfiguration(
- taskManagerGateway.path(),
- new UnmodifiableConfiguration(new Configuration())));
+ new TaskManagerRuntimeInfo("localhost", new Configuration()));
}
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
[5/8] flink git commit: [FLINK-2322] [streaming] Close file streams
to release resources early.
Posted by se...@apache.org.
[FLINK-2322] [streaming] Close file streams to release resources early.
This closes #928
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0693c92b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0693c92b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0693c92b
Branch: refs/heads/master
Commit: 0693c92bdda655e1fbce232038909a7c2a385a22
Parents: fab61a1
Author: tedyu <yu...@gmail.com>
Authored: Tue Jul 21 11:13:17 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/sca/UdfAnalyzerUtils.java | 19 ++++++++++++++++---
.../flink/api/java/utils/ParameterTool.java | 4 +++-
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index df1e421..dbfd29e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -32,6 +32,7 @@ import org.objectweb.asm.tree.analysis.BasicValue;
import org.objectweb.asm.tree.analysis.Value;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
@@ -114,12 +115,14 @@ public final class UdfAnalyzerUtils {
*/
@SuppressWarnings("unchecked")
public static Object[] findMethodNode(String internalClassName, String name, String desc) {
+ InputStream stream = null;
try {
// iterate through hierarchy and search for method node /
// class that really implements the method
while (internalClassName != null) {
- ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader()
- .getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+ stream = Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream(internalClassName.replace('.', '/') + ".class");
+ ClassReader cr = new ClassReader(stream);
final ClassNode cn = new ClassNode();
cr.accept(cn, 0);
for (MethodNode mn : (List<MethodNode>) cn.methods) {
@@ -129,9 +132,19 @@ public final class UdfAnalyzerUtils {
}
internalClassName = cr.getSuperName();
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new IllegalStateException("Method '" + name + "' could not be found", e);
}
+ finally {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // best effort cleanup
+ }
+ }
+ }
throw new IllegalStateException("Method '" + name + "' could not be found");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 317dce4..b60a559 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -130,7 +130,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
throw new FileNotFoundException("Properties file "+path+" does not exist");
}
Properties props = new Properties();
- props.load(new FileInputStream(propertiesFile));
+ FileInputStream fis = new FileInputStream(propertiesFile);
+ props.load(fis);
+ fis.close();
return fromMap((Map)props);
}