You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/09/19 12:05:19 UTC

[flink] branch master updated (13af3aa -> e273d8d)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 13af3aa  [hotfix][API/DataStream] Add missing assert for checkpoint lock checks
     new 289e147  [FLINK-13896][build] Set Scala compile target version to the same as Java compile target version
     new 35e57a8  [FLINK-11859][runtime] Small improvement to performance of SpanningRecordSerializer
     new 07f86e6  [FLINK-13449][core] Add ARM architecture to MemoryArchitecture
     new 6f36df6  [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before close
     new d98132f  [FLINK-13796][coordination] Remove unused variable from YarnResourceManager
     new 03d7ab6  [hotfix][docs] Fix wrong JavaDoc link in SavepointSerializer
     new e273d8d  [FLINK-13845][javadocs] Drop references to removed 'Checkpointed' interface

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/util/MemoryArchitecture.java  |  2 +-
 .../checkpoint/savepoint/SavepointSerializer.java  |  4 +-
 .../serialization/SpanningRecordSerializer.java    | 23 +++-----
 .../api/scala/operators/ScalaCsvOutputFormat.java  |  1 +
 .../scala/operators/ScalaCsvOutputFormatTest.java  | 62 +++++++++-------------
 .../api/checkpoint/CheckpointedFunction.java       |  1 -
 .../streaming/api/checkpoint/ListCheckpointed.java |  5 +-
 .../org/apache/flink/yarn/YarnResourceManager.java |  3 --
 pom.xml                                            |  1 +
 9 files changed, 42 insertions(+), 60 deletions(-)
 copy flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java => flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java (55%)


[flink] 05/07: [FLINK-13796][coordination] Remove unused variable from YarnResourceManager

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d98132f4e286296a87ab3ae879ca09eb98165c27
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Tue Aug 20 13:35:21 2019 +0200

    [FLINK-13796][coordination] Remove unused variable from YarnResourceManager
    
    This closes #9492
---
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java       | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c7cb064..2e1f62b 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -68,7 +68,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -125,8 +124,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	/** The number of containers requested, but not yet granted. */
 	private int numPendingContainerRequests;
 
-	private final Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
-
 	private final Collection<ResourceProfile> slotsPerWorker;
 
 	private final Resource resource;


[flink] 06/07: [hotfix][docs] Fix wrong JavaDoc link in SavepointSerializer

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03d7ab66c06bc238ddeedacf6a62e7fd2f605496
Author: chendonglin <ch...@jd.com>
AuthorDate: Mon Sep 2 11:35:21 2019 +0800

    [hotfix][docs] Fix wrong JavaDoc link in SavepointSerializer
    
    This closes #9588
---
 .../flink/runtime/checkpoint/savepoint/SavepointSerializer.java       | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 9d0f1e1..71dee6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -25,7 +27,7 @@ import java.io.IOException;
 /**
  * Serializer for {@link Savepoint} instances.
  *
- * <p>This serializer is used to read/write a savepoint via {@link SavepointStore}.
+ * <p>This serializer is used to read/write a savepoint via {@link Checkpoints}.
  *
  * <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
  *


[flink] 01/07: [FLINK-13896][build] Set Scala compile target version to the same as Java compile target version

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 289e147c489c3d0c28d5ea55c95d2d08b2d781b0
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Mon Sep 16 21:10:58 2019 +0800

    [FLINK-13896][build] Set Scala compile target version to the same as Java compile target version
    
    This closes #9692
---
 pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pom.xml b/pom.xml
index 23ab1fb..722eed2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1784,6 +1784,7 @@ under the License.
 					<configuration>
 						<args>
 							<arg>-nobootcp</arg>
+							<arg>-target:jvm-${java.version}</arg>
 						</args>
 						<jvmArgs>
 							<arg>-Xss2m</arg>


[flink] 02/07: [FLINK-11859][runtime] Small improvement to performance of SpanningRecordSerializer

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35e57a8460c7f03010972f587bb24052ea694cce
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 19 11:20:56 2019 +0800

    [FLINK-11859][runtime] Small improvement to performance of SpanningRecordSerializer
    
    This removes the length buffer of SpanningRecordSerializer and serializes the record length to
    the data buffer directly.
    
    This closes #9710
---
 .../serialization/SpanningRecordSerializer.java    | 23 ++++++++--------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index f066679..d6da1ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 
 /**
  * Record serializer which serializes the complete record to an intermediate
@@ -44,18 +43,11 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	/** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */
 	private ByteBuffer dataBuffer;
 
-	/** Intermediate buffer for length serialization. */
-	private final ByteBuffer lengthBuffer;
-
 	public SpanningRecordSerializer() {
 		serializationBuffer = new DataOutputSerializer(128);
 
-		lengthBuffer = ByteBuffer.allocate(4);
-		lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
 		// ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic)
 		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-		lengthBuffer.position(4);
 	}
 
 	/**
@@ -72,13 +64,16 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		}
 
 		serializationBuffer.clear();
-		lengthBuffer.clear();
+		// the initial capacity of the serialization buffer should be no less than 4
+		serializationBuffer.skipBytesToWrite(4);
 
 		// write data and length
 		record.write(serializationBuffer);
 
-		int len = serializationBuffer.length();
-		lengthBuffer.putInt(0, len);
+		int len = serializationBuffer.length() - 4;
+		serializationBuffer.setPosition(0);
+		serializationBuffer.writeInt(len);
+		serializationBuffer.skipBytesToWrite(len);
 
 		dataBuffer = serializationBuffer.wrapAsByteBuffer();
 	}
@@ -92,7 +87,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	 */
 	@Override
 	public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) {
-		targetBuffer.append(lengthBuffer);
 		targetBuffer.append(dataBuffer);
 		targetBuffer.commit();
 
@@ -100,7 +94,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	}
 
 	private SerializationResult getSerializationResult(BufferBuilder targetBuffer) {
-		if (dataBuffer.hasRemaining() || lengthBuffer.hasRemaining()) {
+		if (dataBuffer.hasRemaining()) {
 			return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
 		}
 		return !targetBuffer.isFull()
@@ -111,7 +105,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	@Override
 	public void reset() {
 		dataBuffer.position(0);
-		lengthBuffer.position(0);
 	}
 
 	@Override
@@ -122,6 +115,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 	@Override
 	public boolean hasSerializedData() {
-		return lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
+		return dataBuffer.hasRemaining();
 	}
 }


[flink] 04/07: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before close

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f36df64e5a375ae203ba32662e5b01fcc38e340
Author: Ryan Tao <xm...@163.com>
AuthorDate: Fri Aug 16 23:15:36 2019 +0800

    [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before close
    
    This closes #9467
---
 .../api/scala/operators/ScalaCsvOutputFormat.java  |  1 +
 .../scala/operators/ScalaCsvOutputFormatTest.java  | 87 ++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
index ee5ba65..7fa540d 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -168,6 +168,7 @@ public class ScalaCsvOutputFormat<T extends Product> extends FileOutputFormat<T>
 	@Override
 	public void close() throws IOException {
 		if (wrt != null) {
+			this.wrt.flush();
 			this.wrt.close();
 		}
 		super.close();
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
new file mode 100644
index 0000000..7677ab0
--- /dev/null
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import scala.Tuple3;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ScalaCsvOutputFormat}.
+ */
+public class ScalaCsvOutputFormatTest {
+
+	private String path;
+	private ScalaCsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat;
+
+	@Rule
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+	@Before
+	public void setUp() throws Exception {
+		path = tmpFolder.newFile().getAbsolutePath();
+		csvOutputFormat = new ScalaCsvOutputFormat<>(new Path(path));
+		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+		csvOutputFormat.open(0, 1);
+	}
+
+	@Test
+	public void testNullAllow() throws Exception {
+		try {
+			csvOutputFormat.setAllowNullValues(true);
+			csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
+		} finally {
+			csvOutputFormat.close();
+		}
+		java.nio.file.Path p = Paths.get(path);
+		Assert.assertTrue(Files.exists(p));
+		List<String> lines = Files.readAllLines(Paths.get(path), StandardCharsets.UTF_8);
+		Assert.assertEquals(1, lines.size());
+		Assert.assertEquals("One,,8", lines.get(0));
+	}
+
+	@Test
+	public void testNullDisallowOnDefault() throws Exception {
+		try {
+			csvOutputFormat.setAllowNullValues(false);
+			csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
+			fail("should fail with an exception");
+		} catch (RuntimeException e) {
+			// expected
+		} finally {
+			csvOutputFormat.close();
+		}
+	}
+}


[flink] 03/07: [FLINK-13449][core] Add ARM architecture to MemoryArchitecture

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07f86e6eeeadefc6621e13987b0158dd0018c614
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Tue Jul 30 15:55:33 2019 +0800

    [FLINK-13449][core] Add ARM architecture to MemoryArchitecture
    
    This patch adds 'aarch64' for ARM64/AARCH64 to the known 64 bit memory architectures.
    
    This closes #9273
---
 flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java b/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java
index 3672921..87451d4 100755
--- a/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java
@@ -60,7 +60,7 @@ public enum MemoryArchitecture {
 	private static MemoryArchitecture getInternal() {
 		// putting these into the method to avoid having objects on the heap that are not needed
 		// any more after initialization
-		final List<String> names64bit = Arrays.asList("amd64", "x86_64");
+		final List<String> names64bit = Arrays.asList("amd64", "x86_64", "aarch64");
 		final List<String> names32bit = Arrays.asList("x86", "i386", "i486", "i586", "i686");
 		final String arch = System.getProperty("os.arch");
 


[flink] 07/07: [FLINK-13845][javadocs] Drop references to removed 'Checkpointed' interface

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e273d8d89fd47d7d83e9e0e6adec5cb5514a334e
Author: Yun Tang <my...@live.com>
AuthorDate: Tue Sep 3 20:39:27 2019 +0800

    [FLINK-13845][javadocs] Drop references to removed 'Checkpointed' interface
    
    This closes #9604
---
 .../apache/flink/streaming/api/checkpoint/CheckpointedFunction.java  | 1 -
 .../org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java  | 5 ++---
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 2c6c569..91bf241 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -116,7 +116,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
  * <h4>Operator State</h4>
  * Checkpointing some state that is part of the function object itself is possible in a simpler way
  * by directly implementing the {@link ListCheckpointed} interface.
- * That mechanism is similar to the previously used {@link Checkpointed} interface.
  *
  * <h4>Keyed State</h4>
  * Access to keyed state is possible via the {@link RuntimeContext}'s methods:
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index b789a51..f4cc794 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -27,9 +27,8 @@ import java.io.Serializable;
 import java.util.List;
 
 /**
- * This interface can be implemented by functions that want to store state in checkpoints.
- * It can be used in a similar way as the deprecated {@link Checkpointed} interface, but
- * supports <b>list-style state redistribution</b> for cases when the parallelism of the
+ * This interface can be implemented by functions that want to store state in checkpoints
+ * and supports <b>list-style state redistribution</b> for cases when the parallelism of the
  * transformation is changed.
  *
  * <p>Implementing this interface is a shortcut for obtaining the default {@code ListState}