You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/16 15:50:29 UTC
[1/3] flink git commit: [FLINK-4366] Add 'forceNonParallel()' to
stream transformations and enforce parallelism=1 for AllWindowedStream
Repository: flink
Updated Branches:
refs/heads/master ad3454069 -> 9623027af
[FLINK-4366] Add 'forceNonParallel()' to stream transformations and enforce parallelism=1 for AllWindowedStream
This closes #2354
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffe40657
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffe40657
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffe40657
Branch: refs/heads/master
Commit: ffe406570af60057fa2ae318561aa239b99bd648
Parents: 6cdf06c
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Aug 11 16:04:54 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 16 17:49:53 2016 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 6 +++---
.../datastream/SingleOutputStreamOperator.java | 20 +++++++++++++++++++-
.../StreamingScalaAPICompletenessTest.scala | 3 +--
3 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe40657/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index fa3b90d..4b083c8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -302,7 +302,7 @@ public class AllWindowedStream<T, W extends Window> {
allowedLateness);
}
- return input.transform(opName, resultType, operator).setParallelism(1);
+ return input.transform(opName, resultType, operator).forceNonParallel();
}
/**
@@ -391,7 +391,7 @@ public class AllWindowedStream<T, W extends Window> {
allowedLateness);
}
- return input.transform(opName, resultType, operator).setParallelism(1);
+ return input.transform(opName, resultType, operator).forceNonParallel();
}
/**
@@ -486,7 +486,7 @@ public class AllWindowedStream<T, W extends Window> {
allowedLateness);
}
- return input.transform(opName, resultType, operator).setParallelism(1);
+ return input.transform(opName, resultType, operator).forceNonParallel();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe40657/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 2c7b5cb..02ea219 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -40,6 +40,9 @@ import static java.util.Objects.requireNonNull;
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
+ /** Indicate this is a non-parallel operator and cannot set a non-1 degree of parallelism. **/
+ protected boolean nonParallel = false;
+
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
super(environment, transformation);
}
@@ -94,13 +97,28 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
if (parallelism < 1) {
throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
}
-
+ if (nonParallel && parallelism > 1) {
+ throw new IllegalArgumentException("The parallelism of non parallel operator must be 1.");
+ }
transformation.setParallelism(parallelism);
return this;
}
/**
+ * Sets the parallelism of this operator to one.
+ * And mark this operator cannot set a non-1 degree of parallelism.
+ *
+ * @return The operator with only one parallelism.
+ */
+ @PublicEvolving
+ public SingleOutputStreamOperator<T> forceNonParallel() {
+ transformation.setParallelism(1);
+ nonParallel = true;
+ return this;
+ }
+
+ /**
* Sets the maximum time frequency (ms) for the flushing of the output
* buffer. By default the output buffers flush only when they are full.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe40657/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index f6c5146..7cf6935 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -28,7 +28,6 @@ import org.junit.Test
/**
* This checks whether the streaming Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for streaming.
*/
class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
@@ -40,7 +39,7 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.datastream.DataStream.getType",
"org.apache.flink.streaming.api.datastream.DataStream.copy",
"org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
- "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
+ "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.forceNonParallel",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getFirstInput",
[3/3] flink git commit: [hotfix] [tests] Let
SlotCountExceedingParallelismTest use the TestLogger
Posted by se...@apache.org.
[hotfix] [tests] Let SlotCountExceedingParallelismTest use the TestLogger
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9623027a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9623027a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9623027a
Branch: refs/heads/master
Commit: 9623027afc6a149c2323f1f3164a98b72e3b48ef
Parents: ffe4065
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 15 18:51:30 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 16 17:49:55 2016 +0200
----------------------------------------------------------------------
.../runtime/jobmanager/SlotCountExceedingParallelismTest.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9623027a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index e12faf9..49b11b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -30,13 +30,14 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.BitSet;
-public class SlotCountExceedingParallelismTest {
+public class SlotCountExceedingParallelismTest extends TestLogger {
// Test configuration
private final static int NUMBER_OF_TMS = 2;
[2/3] flink git commit: [FLINK-4388] [core] Make MemorySegmentFactory
check and initialization atomic.
Posted by se...@apache.org.
[FLINK-4388] [core] Make MemorySegmentFactory check and initialization atomic.
This prevents rare race conditions when starting multiple TaskManagers in the same JVM.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cdf06cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cdf06cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cdf06cc
Branch: refs/heads/master
Commit: 6cdf06ccb4c7ac444521ca3c3ff4317a4e947e6d
Parents: ad34540
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 12 12:02:30 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 16 17:49:53 2016 +0200
----------------------------------------------------------------------
.../flink/core/memory/MemorySegmentFactory.java | 49 ++++++++++------
.../core/memory/MemorySegmentFactoryTest.java | 62 ++++++++++++++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 10 +---
3 files changed, 94 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6cdf06cc/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 882c37d..1e5c3ad 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -22,13 +22,15 @@ import org.apache.flink.annotation.Internal;
import java.nio.ByteBuffer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A factory for memory segments. The purpose of this factory is to make sure that all memory segments
* for heap data are of the same type. That way, the runtime does not mix the various specializations
* of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial
* to method specialization by the JIT and to overall performance.
- * <p>
- * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment},
+ *
+ * <p>Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment},
* if a request to create a segment comes before the initialization.
*/
@Internal
@@ -36,7 +38,7 @@ public class MemorySegmentFactory {
/** The factory to use */
private static volatile Factory factory;
-
+
/**
* Creates a new memory segment that targets the given heap memory region.
* This method should be used to turn short lived byte arrays into memory segments.
@@ -110,26 +112,28 @@ public class MemorySegmentFactory {
ensureInitialized();
return factory.wrapPooledOffHeapMemory(memory, owner);
}
-
+
// ------------------------------------------------------------------------
-
+
/**
- * Initializes this factory with the given concrete factory.
- *
+ * Initializes this factory with the given concrete factory, iff it is not yet initialized.
+ * This also checks if the factory is already initialized to the exact same concrete factory
+ * as given.
+ *
* @param f The concrete factory to use.
- * @throws java.lang.IllegalStateException Thrown, if this factory has been initialized before.
+ * @return True, if the factory is initialized with the given concrete factory, or if it was already
+ * initialized with the exact same concrete factory. False, if it is already initialized with
+ * a different concrete factory.
*/
- public static void initializeFactory(Factory f) {
- if (f == null) {
- throw new NullPointerException();
- }
-
+ public static boolean initializeIfNotInitialized(Factory f) {
+ checkNotNull(f);
+
synchronized (MemorySegmentFactory.class) {
if (factory == null) {
factory = f;
- }
- else {
- throw new IllegalStateException("Factory has already been initialized");
+ return true;
+ } else {
+ return factory == f;
}
}
}
@@ -151,10 +155,17 @@ public class MemorySegmentFactory {
public static Factory getFactory() {
return factory;
}
-
+
+ /**
+ * Sets the factory to the {@link HeapMemorySegment#FACTORY} is no factory has been initialized, yet.
+ */
private static void ensureInitialized() {
if (factory == null) {
- factory = HeapMemorySegment.FACTORY;
+ synchronized (MemorySegmentFactory.class) {
+ if (factory == null) {
+ factory = HeapMemorySegment.FACTORY;
+ }
+ }
}
}
@@ -165,7 +176,7 @@ public class MemorySegmentFactory {
/**
* A concrete factory for memory segments.
*/
- public static interface Factory {
+ public interface Factory {
/**
* Creates a new memory segment that targets the given heap memory region.
http://git-wip-us.apache.org/repos/asf/flink/blob/6cdf06cc/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
new file mode 100644
index 0000000..84105cf
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.*;
+
+public class MemorySegmentFactoryTest {
+
+ @Test
+ public void testInitializationToHeapSegments() throws Exception {
+ clearSegmentFactory();
+
+ assertFalse(MemorySegmentFactory.isInitialized());
+ assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY));
+ assertTrue(MemorySegmentFactory.isInitialized());
+
+ assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY));
+ assertFalse(MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY));
+
+ assertEquals(HeapMemorySegment.FACTORY, MemorySegmentFactory.getFactory());
+ }
+
+ @Test
+ public void testInitializationToOffHeapSegments() throws Exception {
+ clearSegmentFactory();
+
+ assertFalse(MemorySegmentFactory.isInitialized());
+ assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY));
+ assertTrue(MemorySegmentFactory.isInitialized());
+
+ assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY));
+ assertFalse(MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY));
+
+ assertEquals(HybridMemorySegment.FACTORY, MemorySegmentFactory.getFactory());
+ }
+
+ private static void clearSegmentFactory() throws Exception {
+ Field factoryField = MemorySegmentFactory.class.getDeclaredField("factory");
+ factoryField.setAccessible(true);
+ factoryField.set(null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6cdf06cc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e732214..5a95143 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2054,19 +2054,13 @@ object TaskManager {
// initialize the memory segment factory accordingly
memType match {
case MemoryType.HEAP =>
- if (!MemorySegmentFactory.isInitialized()) {
- MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY)
- }
- else if (MemorySegmentFactory.getFactory() != HeapMemorySegment.FACTORY) {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
throw new Exception("Memory type is set to heap memory, but memory segment " +
"factory has been initialized for off-heap memory segments")
}
case MemoryType.OFF_HEAP =>
- if (!MemorySegmentFactory.isInitialized()) {
- MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY)
- }
- else if (MemorySegmentFactory.getFactory() != HybridMemorySegment.FACTORY) {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
throw new Exception("Memory type is set to off-heap memory, but memory segment " +
"factory has been initialized for heap memory segments")
}