You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:14 UTC

[08/13] flink git commit: [FLINK-5969] Add BucketingSinkFrom12MigrationTest

[FLINK-5969] Add BucketingSinkFrom12MigrationTest

The binary snapshots have been created on the Flink 1.2 branch.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb7793f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb7793f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb7793f0

Branch: refs/heads/master
Commit: fb7793f033cfa0d6d77ef25a6c518a5a203ebb82
Parents: 2c6377f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 17:50:59 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:25:57 2017 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  |   6 +
 .../BucketingSinkFrom12MigrationTest.java       | 223 +++++++++++++++++++
 ...keting-sink-migration-test-flink1.2-snapshot | Bin 0 -> 1623 bytes
 3 files changed, 229 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb7793f0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 7dbcda7..20e54b8 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.commons.lang3.time.StopWatch;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -1033,6 +1034,11 @@ public class BucketingSink<T>
 		return this;
 	}
 
+	@VisibleForTesting
+	public State<T> getState() {
+		return state;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Internal Classes
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7793f0/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
new file mode 100644
index 0000000..350b7b4
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for checking whether {@link BucketingSink} can restore from snapshots that were done
+ * using the Flink 1.2 {@link BucketingSink}.
+ *
+ * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on
+ * the Flink 1.2 branch.
+ */
+
+public class BucketingSinkFrom12MigrationTest {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	/**
+	 * Manually run this to write binary snapshot data. Remove @Ignore to run.
+	 */
+	@Ignore
+	@Test
+	public void writeSnapshot() throws Exception {
+
+		final File outDir = tempFolder.newFolder();
+
+		BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>("test1", 0L));
+		testHarness.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness.processElement(new StreamRecord<>("test3", 0L));
+		testHarness.processElement(new StreamRecord<>("test4", 0L));
+		testHarness.processElement(new StreamRecord<>("test5", 0L));
+
+		checkFs(outDir, 1, 4, 0, 0);
+
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+
+		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot");
+		testHarness.close();
+	}
+
+	@Test
+	public void testRestore() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		ValidatingBucketingSink<String> sink = (ValidatingBucketingSink<String>) new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(sink), 10, 1, 0);
+		testHarness.setup();
+		testHarness.initializeState(
+				OperatorSnapshotUtil.readStateHandle(
+						OperatorSnapshotUtil.getResourceFilename("bucketing-sink-migration-test-flink1.2-snapshot")));
+		testHarness.open();
+
+		assertTrue(sink.initializeCalled);
+
+		testHarness.processElement(new StreamRecord<>("test1", 0L));
+		testHarness.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness.close();
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	static class ValidatingBucketingSink<T> extends BucketingSink<T> {
+
+		private static final long serialVersionUID = -4263974081712009141L;
+
+		public boolean initializeCalled = false;
+
+		ValidatingBucketingSink(String basePath) {
+			super(basePath);
+		}
+
+		/**
+		 * The actual paths in this depend on the binary checkpoint so it you update this the paths
+		 * here have to be updated as well.
+		 */
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			OperatorStateStore stateStore = context.getOperatorStateStore();
+
+			ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+
+			if (context.isRestored()) {
+
+				for (State<T> states : restoredBucketStates.get()) {
+					for (String bucketPath : states.bucketStates.keySet()) {
+						BucketState state = states.getBucketState(new Path(bucketPath));
+						String current = state.currentFile;
+						long validLength = state.currentFileValidLength;
+
+						Assert.assertEquals("/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-4", current);
+						Assert.assertEquals(6, validLength);
+
+						List<String> pendingFiles = state.pendingFiles;
+						assertTrue(pendingFiles.isEmpty());
+
+						final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+						Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+						for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+							long checkpoint = entry.getKey();
+							List<String> files = entry.getValue();
+
+							Assert.assertEquals(0L, checkpoint);
+							Assert.assertEquals(4, files.size());
+
+							for (int i = 0; i < 4; i++) {
+								Assert.assertEquals(
+										"/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-" + i,
+										files.get(i));
+							}
+						}
+					}
+				}
+			}
+
+			initializeCalled = true;
+			super.initializeState(context);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7793f0/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot
new file mode 100644
index 0000000..a541bad
Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot differ