You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/15 12:37:01 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

godfreyhe commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r454930701



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
##########
@@ -40,18 +43,37 @@
 	public CollectResultIterator(
 			CompletableFuture<OperatorID> operatorIdFuture,
 			TypeSerializer<T> serializer,
-			String accumulatorName) {
-		this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName);
+			String accumulatorName,
+			CheckpointConfig checkpointConfig) {
+		if (checkpointConfig.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE) {
+			if (checkpointConfig.getCheckpointInterval() >= CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME) {
+				this.fetcher = new CollectResultFetcher<>(
+					new CheckpointedCollectResultBuffer<>(serializer),
+					operatorIdFuture,
+					accumulatorName);
+			} else {
+				this.fetcher = new CollectResultFetcher<>(
+					new UncheckpointedCollectResultBuffer<>(serializer, false),
+					operatorIdFuture,
+					accumulatorName);
+			}

Review comment:
       I think this branch is unnecessary, because it's illegal that checkpoint interval is less than `MINIMAL_CHECKPOINT_TIME`, many places have such validation, e.g. `CheckpointConfig.setCheckpointInterval`

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME;
+
+/**
+ * Random IT cases for {@link CollectSinkFunction}.
+ * It will perform random insert, random checkpoint and random restart.
+ */
+public class CollectSinkFunctionRandomITCase extends TestLogger {
+
+	private static final int MAX_RESULTS_PER_BATCH = 3;
+	private static final JobID TEST_JOB_ID = new JobID();
+	private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
+
+	private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
+
+	private CollectSinkFunctionTestWrapper<Integer> functionWrapper;
+	private boolean jobFinished;
+
+	@Test
+	public void testUncheckpointedFunction() throws Exception {
+		// run multiple times for this random test
+		for (int testCount = 30; testCount > 0; testCount--) {
+			functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, MAX_RESULTS_PER_BATCH * 4);
+			jobFinished = false;
+
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 50; i++) {
+				expected.add(i);
+			}
+			Thread feeder = new ThreadWithException(new UncheckpointedDataFeeder(expected));
+
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
+
+			functionWrapper.closeWrapper();
+		}
+	}
+
+	@Test
+	public void testCheckpointedFunction() throws Exception {
+		// run multiple times for this random test
+		for (int testCount = 30; testCount > 0; testCount--) {
+			functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, MAX_RESULTS_PER_BATCH * 4);
+			jobFinished = false;
+
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 50; i++) {
+				expected.add(i);
+			}
+			Thread feeder = new ThreadWithException(new CheckpointedDataFeeder(expected));
+
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
+
+			functionWrapper.closeWrapper();
+		}
+	}
+
+	private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
+		CollectClient collectClient = new CollectClient();
+		Thread client = new ThreadWithException(collectClient);
+
+		Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
+			feeder.interrupt();
+			client.interrupt();
+			e.printStackTrace();
+		};
+		feeder.setUncaughtExceptionHandler(exceptionHandler);
+		client.setUncaughtExceptionHandler(exceptionHandler);
+
+		feeder.start();
+		client.start();
+		feeder.join();
+		client.join();
+
+		return collectClient.results;
+	}
+
+	private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
+		Collections.sort(expected);
+		Collections.sort(actual);
+		Assert.assertThat(actual, CoreMatchers.is(expected));
+	}
+
+	/**
+	 * A {@link RunnableWithException} feeding data to the function. It will fail when half of the data is fed.
+	 */
+	private class UncheckpointedDataFeeder implements RunnableWithException {
+
+		private LinkedList<Integer> data;
+		private final List<Integer> checkpointedData;

Review comment:
       `checkpointedData` for  `UncheckpointedDataFeeder` ?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectSinkFunctionTestWrapper.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A wrapper class for creating, checkpointing and closing
+ * {@link org.apache.flink.streaming.api.operators.collect.CollectSinkFunction} for tests.
+ */
+public class CollectSinkFunctionTestWrapper<IN> {
+
+	public static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
+
+	private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+	private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+	private static final int MAX_RETIRES = 100;
+
+	private final TypeSerializer<IN> serializer;
+	private final int maxBytesPerBatch;
+
+	private final IOManager ioManager;
+	private final StreamingRuntimeContext runtimeContext;
+	private final MockOperatorEventGateway gateway;
+	private final CollectSinkOperatorCoordinator coordinator;
+	private final MockFunctionInitializationContext functionInitializationContext;
+
+	private CollectSinkFunction<IN> function;
+
+	public CollectSinkFunctionTestWrapper(TypeSerializer<IN> serializer, int maxBytesPerBatch) throws Exception {
+		this.serializer = serializer;
+		this.maxBytesPerBatch = maxBytesPerBatch;
+
+		this.ioManager = new IOManagerAsync();
+		MockEnvironment environment = new MockEnvironmentBuilder()
+			.setTaskName("mockTask")
+			.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+			.setIOManager(ioManager)
+			.build();
+		this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+		this.gateway = new MockOperatorEventGateway();
+
+		this.coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
+		this.coordinator.start();
+
+		this.functionInitializationContext = new MockFunctionInitializationContext();
+	}
+
+	public void closeWrapper() throws Exception {
+		coordinator.close();
+		ioManager.close();
+	}
+
+	public CollectSinkOperatorCoordinator getCoordinator() {
+		return coordinator;
+	}
+
+	public void openFunction() throws Exception {
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void openFunctionWithState() throws Exception {
+		functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.initializeState(functionInitializationContext);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void invoke(IN record) throws Exception {
+		function.invoke(record, null);
+	}
+
+	public void checkpointFunction(long checkpointId) throws Exception {
+		function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+		functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+	}
+
+	public void checkpointComplete(long checkpointId) {
+		function.notifyCheckpointComplete(checkpointId);
+		functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+	}
+
+	public void closeFunctionNormally() throws Exception {
+		// this is a normal shutdown
+		function.accumulateFinalResults();
+		function.close();
+	}
+
+	public void closeFuntionAbnormally() throws Exception {

Review comment:
       typo: `Funtion`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/UncheckpointedCollectResultBuffer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.io.IOException;
+
+/**
+ * A buffer which encapsulates the logic of dealing with the response from the {@link CollectSinkFunction}.
+ * It ignores the checkpoint related fields in the response.
+ * See Java doc of {@link CollectSinkFunction} for explanation of this communication protocol.
+ */
+public class UncheckpointedCollectResultBuffer<T> extends AbstractCollectResultBuffer<T> {
+
+	private final boolean failureTolerance;
+
+	public UncheckpointedCollectResultBuffer(TypeSerializer<T> serializer, boolean failureTolerance) {
+		super(serializer);
+		this.failureTolerance = failureTolerance;
+	}
+
+	@Override
+	public void dealWithResponse(CollectCoordinationResponse response, long responseOffset) throws IOException {
+		String responseVersion = response.getVersion();
+
+		if (!version.equals(responseVersion)) {
+			if (!INIT_VERSION.equals(version) && !failureTolerance) {
+				// sink restarted but we do not tolerate failure
+				throw new RuntimeException("Job restarted");
+			}
+
+			reset();
+			version = responseVersion;
+		}
+
+		addResults(response, responseOffset);
+		// the results are instantly visible by users
+		userVisibleTail = offset;

Review comment:
       change `addResults ` as a utility method ? so that we can handle the variables (e.g. `offset`) in one method, and make this method more readable.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectSinkFunctionTestWrapper.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A wrapper class for creating, checkpointing and closing
+ * {@link org.apache.flink.streaming.api.operators.collect.CollectSinkFunction} for tests.
+ */
+public class CollectSinkFunctionTestWrapper<IN> {
+
+	public static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
+
+	private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+	private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+	private static final int MAX_RETIRES = 100;
+
+	private final TypeSerializer<IN> serializer;
+	private final int maxBytesPerBatch;
+
+	private final IOManager ioManager;
+	private final StreamingRuntimeContext runtimeContext;
+	private final MockOperatorEventGateway gateway;
+	private final CollectSinkOperatorCoordinator coordinator;
+	private final MockFunctionInitializationContext functionInitializationContext;
+
+	private CollectSinkFunction<IN> function;
+
+	public CollectSinkFunctionTestWrapper(TypeSerializer<IN> serializer, int maxBytesPerBatch) throws Exception {
+		this.serializer = serializer;
+		this.maxBytesPerBatch = maxBytesPerBatch;
+
+		this.ioManager = new IOManagerAsync();
+		MockEnvironment environment = new MockEnvironmentBuilder()
+			.setTaskName("mockTask")
+			.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+			.setIOManager(ioManager)
+			.build();
+		this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+		this.gateway = new MockOperatorEventGateway();
+
+		this.coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
+		this.coordinator.start();
+
+		this.functionInitializationContext = new MockFunctionInitializationContext();
+	}
+
+	public void closeWrapper() throws Exception {
+		coordinator.close();
+		ioManager.close();
+	}
+
+	public CollectSinkOperatorCoordinator getCoordinator() {
+		return coordinator;
+	}
+
+	public void openFunction() throws Exception {
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void openFunctionWithState() throws Exception {
+		functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.initializeState(functionInitializationContext);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void invoke(IN record) throws Exception {
+		function.invoke(record, null);
+	}
+
+	public void checkpointFunction(long checkpointId) throws Exception {
+		function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+		functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+	}
+
+	public void checkpointComplete(long checkpointId) {
+		function.notifyCheckpointComplete(checkpointId);
+		functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+	}
+
+	public void closeFunctionNormally() throws Exception {
+		// this is a normal shutdown
+		function.accumulateFinalResults();
+		function.close();
+	}
+
+	public void closeFuntionAbnormally() throws Exception {
+		// this is an exceptional shutdown
+		function.close();
+		coordinator.subtaskFailed(0, null);
+	}
+
+	public CollectCoordinationResponse sendRequestAndGetResponse(String version, long offset) throws Exception {
+		CollectCoordinationResponse response;
+		for (int i = 0; i < MAX_RETIRES; i++) {
+			response = sendRequest(version, offset);
+			if (response.getLastCheckpointedOffset() >= 0) {
+				return response;
+			}
+		}
+		throw new RuntimeException("Too many retries in sendRequestAndGetValidResponse");
+	}
+
+	private CollectCoordinationResponse sendRequest(String version, long offset) throws Exception {
+		CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
+		// we add a timeout to not block the tests if it fails
+		return ((CollectCoordinationResponse) coordinator
+			.handleCoordinationRequest(request).get(FUTURE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+	}
+
+	public Tuple2<Long, CollectCoordinationResponse> getAccumualtorResults() throws Exception {

Review comment:
       typo: `Accumualtor`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org