You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/16 15:50:29 UTC

[1/3] flink git commit: [FLINK-4366] Add 'forceNonParallel()' to stream transformations and enforce parallelism=1 for AllWindowedStream

Repository: flink
Updated Branches:
  refs/heads/master ad3454069 -> 9623027af


[FLINK-4366] Add 'forceNonParallel()' to stream transformations and enforce parallelism=1 for AllWindowedStream

This closes #2354


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffe40657
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffe40657
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffe40657

Branch: refs/heads/master
Commit: ffe406570af60057fa2ae318561aa239b99bd648
Parents: 6cdf06c
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Aug 11 16:04:54 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 16 17:49:53 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  6 +++---
 .../datastream/SingleOutputStreamOperator.java  | 20 +++++++++++++++++++-
 .../StreamingScalaAPICompletenessTest.scala     |  3 +--
 3 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffe40657/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index fa3b90d..4b083c8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -302,7 +302,7 @@ public class AllWindowedStream<T, W extends Window> {
 					allowedLateness);
 		}
 
-		return input.transform(opName, resultType, operator).setParallelism(1);
+		return input.transform(opName, resultType, operator).forceNonParallel();
 	}
 
 	/**
@@ -391,7 +391,7 @@ public class AllWindowedStream<T, W extends Window> {
 					allowedLateness);
 		}
 
-		return input.transform(opName, resultType, operator).setParallelism(1);
+		return input.transform(opName, resultType, operator).forceNonParallel();
 	}
 
 	/**
@@ -486,7 +486,7 @@ public class AllWindowedStream<T, W extends Window> {
 					allowedLateness);
 		}
 
-		return input.transform(opName, resultType, operator).setParallelism(1);
+		return input.transform(opName, resultType, operator).forceNonParallel();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe40657/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 2c7b5cb..02ea219 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -40,6 +40,9 @@ import static java.util.Objects.requireNonNull;
 @Public
 public class SingleOutputStreamOperator<T> extends DataStream<T> {
 
+	/** Indicate this is a non-parallel operator and cannot set a non-1 degree of parallelism. **/
+	protected boolean nonParallel = false;
+
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
 		super(environment, transformation);
 	}
@@ -94,13 +97,28 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 		if (parallelism < 1) {
 			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
 		}
-
+		if (nonParallel && parallelism > 1) {
+			throw new IllegalArgumentException("The parallelism of non parallel operator must be 1.");
+		}
 		transformation.setParallelism(parallelism);
 
 		return this;
 	}
 
 	/**
+	 * Sets the parallelism of this operator to one.
+	 * And mark this operator cannot set a non-1 degree of parallelism.
+	 *
+	 * @return The operator with only one parallelism.
+	 */
+	@PublicEvolving
+	public SingleOutputStreamOperator<T> forceNonParallel() {
+		transformation.setParallelism(1);
+		nonParallel = true;
+		return this;
+	}
+
+	/**
 	 * Sets the maximum time frequency (ms) for the flushing of the output
 	 * buffer. By default the output buffers flush only when they are full.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe40657/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index f6c5146..7cf6935 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -28,7 +28,6 @@ import org.junit.Test
 
 /**
  * This checks whether the streaming Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for streaming.
  */
 class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
 
@@ -40,7 +39,7 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.DataStream.getType",
       "org.apache.flink.streaming.api.datastream.DataStream.copy",
       "org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
-      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
+      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.forceNonParallel",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getFirstInput",


[3/3] flink git commit: [hotfix] [tests] Let SlotCountExceedingParallelismTest use the TestLogger

Posted by se...@apache.org.
[hotfix] [tests] Let SlotCountExceedingParallelismTest use the TestLogger


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9623027a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9623027a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9623027a

Branch: refs/heads/master
Commit: 9623027afc6a149c2323f1f3164a98b72e3b48ef
Parents: ffe4065
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 15 18:51:30 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 16 17:49:55 2016 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/SlotCountExceedingParallelismTest.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9623027a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index e12faf9..49b11b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -30,13 +30,14 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.types.IntValue;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.BitSet;
 
-public class SlotCountExceedingParallelismTest {
+public class SlotCountExceedingParallelismTest extends TestLogger {
 
 	// Test configuration
 	private final static int NUMBER_OF_TMS = 2;


[2/3] flink git commit: [FLINK-4388] [core] Make MemorySegmentFactory check and initialization atomic.

Posted by se...@apache.org.
[FLINK-4388] [core] Make MemorySegmentFactory check and initialization atomic.

This prevents rare race conditions when starting multiple TaskManagers in the same JVM.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cdf06cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cdf06cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cdf06cc

Branch: refs/heads/master
Commit: 6cdf06ccb4c7ac444521ca3c3ff4317a4e947e6d
Parents: ad34540
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 12 12:02:30 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 16 17:49:53 2016 +0200

----------------------------------------------------------------------
 .../flink/core/memory/MemorySegmentFactory.java | 49 ++++++++++------
 .../core/memory/MemorySegmentFactoryTest.java   | 62 ++++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala | 10 +---
 3 files changed, 94 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cdf06cc/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 882c37d..1e5c3ad 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -22,13 +22,15 @@ import org.apache.flink.annotation.Internal;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A factory for memory segments. The purpose of this factory is to make sure that all memory segments
  * for heap data are of the same type. That way, the runtime does not mix the various specializations
  * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial
  * to method specialization by the JIT and to overall performance.
- * <p>
- * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment},
+ * 
+ * <p>Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment},
  * if a request to create a segment comes before the initialization.
  */
 @Internal
@@ -36,7 +38,7 @@ public class MemorySegmentFactory {
 
 	/** The factory to use */
 	private static volatile Factory factory;
-	
+
 	/**
 	 * Creates a new memory segment that targets the given heap memory region.
 	 * This method should be used to turn short lived byte arrays into memory segments.
@@ -110,26 +112,28 @@ public class MemorySegmentFactory {
 		ensureInitialized();
 		return factory.wrapPooledOffHeapMemory(memory, owner);
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	/**
-	 * Initializes this factory with the given concrete factory.
-	 * 
+	 * Initializes this factory with the given concrete factory, iff it is not yet initialized.
+	 * This also checks if the factory is already initialized to the exact same concrete factory
+	 * as given.
+	 *
 	 * @param f The concrete factory to use.
-	 * @throws java.lang.IllegalStateException Thrown, if this factory has been initialized before.
+	 * @return True, if the factory is initialized with the given concrete factory, or if it was already
+	 *         initialized with the exact same concrete factory. False, if it is already initialized with
+	 *         a different concrete factory.
 	 */
-	public static void initializeFactory(Factory f) {
-		if (f == null) {
-			throw new NullPointerException();
-		}
-	
+	public static boolean initializeIfNotInitialized(Factory f) {
+		checkNotNull(f);
+
 		synchronized (MemorySegmentFactory.class) {
 			if (factory == null) {
 				factory = f;
-			}
-			else {
-				throw new IllegalStateException("Factory has already been initialized");
+				return true;
+			} else {
+				return factory == f;
 			}
 		}
 	}
@@ -151,10 +155,17 @@ public class MemorySegmentFactory {
 	public static Factory getFactory() {
 		return factory;
 	}
-	
+
+	/**
+	 * Sets the factory to the {@link HeapMemorySegment#FACTORY} is no factory has been initialized, yet.
+	 */
 	private static void ensureInitialized() {
 		if (factory == null) {
-			factory = HeapMemorySegment.FACTORY;
+			synchronized (MemorySegmentFactory.class) {
+				if (factory == null) {
+					factory = HeapMemorySegment.FACTORY;
+				}
+			}
 		}
 	}
 
@@ -165,7 +176,7 @@ public class MemorySegmentFactory {
 	/**
 	 * A concrete factory for memory segments.
 	 */
-	public static interface Factory {
+	public interface Factory {
 
 		/**
 		 * Creates a new memory segment that targets the given heap memory region.

http://git-wip-us.apache.org/repos/asf/flink/blob/6cdf06cc/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
new file mode 100644
index 0000000..84105cf
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.*;
+
+public class MemorySegmentFactoryTest {
+
+	@Test
+	public void testInitializationToHeapSegments() throws Exception {
+		clearSegmentFactory();
+
+		assertFalse(MemorySegmentFactory.isInitialized());
+		assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY));
+		assertTrue(MemorySegmentFactory.isInitialized());
+
+		assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY));
+		assertFalse(MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY));
+
+		assertEquals(HeapMemorySegment.FACTORY, MemorySegmentFactory.getFactory());
+	}
+
+	@Test
+	public void testInitializationToOffHeapSegments() throws Exception {
+		clearSegmentFactory();
+
+		assertFalse(MemorySegmentFactory.isInitialized());
+		assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY));
+		assertTrue(MemorySegmentFactory.isInitialized());
+
+		assertTrue(MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY));
+		assertFalse(MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY));
+
+		assertEquals(HybridMemorySegment.FACTORY, MemorySegmentFactory.getFactory());
+	}
+
+	private static void clearSegmentFactory() throws Exception {
+		Field factoryField = MemorySegmentFactory.class.getDeclaredField("factory");
+		factoryField.setAccessible(true);
+		factoryField.set(null, null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cdf06cc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e732214..5a95143 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2054,19 +2054,13 @@ object TaskManager {
     // initialize the memory segment factory accordingly
     memType match {
       case MemoryType.HEAP =>
-        if (!MemorySegmentFactory.isInitialized()) {
-          MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY)
-        }
-        else if (MemorySegmentFactory.getFactory() != HeapMemorySegment.FACTORY) {
+        if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
           throw new Exception("Memory type is set to heap memory, but memory segment " +
             "factory has been initialized for off-heap memory segments")
         }
 
       case MemoryType.OFF_HEAP =>
-        if (!MemorySegmentFactory.isInitialized()) {
-          MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY)
-        }
-        else if (MemorySegmentFactory.getFactory() != HybridMemorySegment.FACTORY) {
+        if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
           throw new Exception("Memory type is set to off-heap memory, but memory segment " +
             "factory has been initialized for heap memory segments")
         }