You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/20 08:20:52 UTC

[GitHub] [flink] becketqin commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

becketqin commented on a change in pull request #11554:
URL: https://github.com/apache/flink/pull/11554#discussion_r410904702



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");

Review comment:
       I like the idea. Makes a lot of sense.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		ensureStarted();

Review comment:
       Good point. We probably don't have to enforce this.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		ensureStarted();
+		LOG.info("Closing SourceCoordinator.");
+		enumerator.close();
+		coordinatorExecutor.shutdown();
+		LOG.info("Source coordinator closed.");
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			LOG.debug("Handling event from operator: {}", event);
+			if (event instanceof SourceEventWrapper) {
+				enumerator.handleSourceEvent(subtask, ((SourceEventWrapper) event).getSourceEvent());
+			} else if (event instanceof ReaderRegistrationEvent) {
+				handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+			}
+		});
+	}
+
+	@Override
+	public void subtaskFailed(int subtaskId) {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			LOG.info("Handling subtask {} failure.", subtaskId);
+			List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId);
+			context.unregisterSourceReader(subtaskId);
+			LOG.debug("Adding {} back to the split enumerator.", splitsToAddBack);
+			enumerator.addSplitsBack(splitsToAddBack, subtaskId);
+		});
+	}
+
+	@Override
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+		ensureStarted();
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				LOG.debug("Taking a state snapshot for checkpoint {}", checkpointId);
+				return toBytes(checkpointId);
+			} catch (Exception e) {
+				throw new FlinkRuntimeException("Failed to checkpoint coordinator due to ", e);
+			}
+		}, coordinatorExecutor);
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			LOG.info("Marking checkpoint {} as completed.", checkpointId);
+			context.onCheckpointComplete(checkpointId);
+		});
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		if (started) {

Review comment:
       I was not quite sure about whether we should create a new instance of the `SourceCoordinator` or reuse the current one. The main concern is around some dangling tasks in the coordinator executor that are scheduled by the `SplitEnumerator`. Reseting coordinator state on a running split enumerator seems hard to have a determined behavior. To avoid potential impact from the leftover instances, there might be two ways:
   
   1. Do the clean up to close the `SourceCoordinator`, create a new instance and rest the state. This approach is simple to implement and unlikely to have bugs.
   2. Introduce an epoch and bump it up on state restore. Any request or state change from an old epoch will be ignored. This will avoid some object creations, but is more complicated to make right. I am not sure if the gain in the recovery time is enough to justify the complexity.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		ensureStarted();
+		LOG.info("Closing SourceCoordinator.");
+		enumerator.close();
+		coordinatorExecutor.shutdown();

Review comment:
       I was trying to avoid blocking the JM main thread. The executor only takes tasks from either `JobMaster` or `SplitEnumerator`. Given that the `SplitEnumerator` is already closed and the JM probably should already stopped invoking anything on this coordinator at this point. I would expect the shutdown to pretty much be a no-op. Maybe it does not hurt to just wait for the coordinator executor to completely shutdown.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -0,0 +1,259 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * <p>The context serves a few purposes:
+ * <ul>
+ *     <li>
+ *         Information provider - The context provides necessary information to the enumerator for it to
+ *         know what is the status of the source readers and their split assignments. These information
+ *         allows the split enumerator to do the coordination.
+ *     </li>
+ *     <li>
+ *         Action taker - The context also provides a few actions that the enumerator can take to carry
+ *         out the coordination. So far there are two actions: 1) assign splits to the source readers.
+ *         and 2) sens a custom {@link SourceEvent SourceEvents} to the source readers.
+ *     </li>
+ *     <li>
+ *         Thread model enforcement - The context ensures that all the manipulations to the coordinator state
+ *         are handled by the same thread.
+ *     </li>
+ * </ul>
+ * @param <SplitT> the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+	private final ExecutorService coordinatorExecutor;
+	private final ExecutorNotifier notifier;
+	private final OperatorCoordinator.Context operatorCoordinatorContext;
+	private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
+	private final SplitAssignmentTracker<SplitT> assignmentTracker;
+	private final String coordinatorThreadName;
+
+	public SourceCoordinatorContext(
+			ExecutorService coordinatorExecutor,
+			String coordinatorThreadName,
+			int numWorkerThreads,
+			OperatorCoordinator.Context operatorCoordinatorContext) {
+		this(coordinatorExecutor, coordinatorThreadName, numWorkerThreads, operatorCoordinatorContext,
+				new SplitAssignmentTracker<>());
+	}
+
+	// Package private method for unit test.
+	SourceCoordinatorContext(
+			ExecutorService coordinatorExecutor,
+			String coordinatorThreadName,
+			int numWorkerThreads,
+			OperatorCoordinator.Context operatorCoordinatorContext,
+			SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.notifier = new ExecutorNotifier(
+				Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() {
+					private int index = 0;
+					@Override
+					public Thread newThread(Runnable r) {
+						return new Thread(r, coordinatorThreadName + "-worker-" + index++);
+					}
+				}),
+				coordinatorExecutor);
+		this.operatorCoordinatorContext = operatorCoordinatorContext;
+		this.registeredReaders = new ConcurrentHashMap<>();
+		this.assignmentTracker = splitAssignmentTracker;
+		this.coordinatorThreadName = coordinatorThreadName;
+	}
+
+	@Override
+	public MetricGroup metricGroup() {
+		return null;
+	}
+
+	@Override
+	public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+		try {
+			operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+		} catch (TaskNotRunningException e) {
+			throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
+					event,
+					subtaskId), e);
+		}
+	}
+
+	@Override
+	public int currentParallelism() {
+		return operatorCoordinatorContext.currentParallelism();
+	}
+
+	@Override
+	public Map<Integer, ReaderInfo> registeredReaders() {
+		return Collections.unmodifiableMap(registeredReaders);
+	}
+
+	@Override
+	public void assignSplits(SplitsAssignment<SplitT> assignment) {
+		// Ensure the split assignment is done by the the coordinator executor.
+		if (!Thread.currentThread().getName().equals(coordinatorThreadName)) {

Review comment:
       The thread was created by a thread factory lazily, therefore I took a shortcut... You caught me here :)

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
##########
@@ -0,0 +1,73 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
+
+/**
+ * The provider of {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorProvider<SplitT extends SourceSplit>
+		implements OperatorCoordinator.Provider {
+	private final OperatorID operatorID;
+	private final Source<?, SplitT, ?> source;
+	private final int numWorkerThreads;
+
+	/**
+	 * Construct the {@link SourceCoordinatorProvider}.
+	 *
+	 * @param operatorID the ID of the operator this coordinator corresponds to.
+	 * @param source the Source that will be used for this coordinator.
+	 * @param numWorkerThreads the number of threads the should provide to the SplitEnumerator
+	 *                         for doing async calls. See
+	 *                         {@link org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync(Callable, BiConsumer)
+	 *                         SplitEnumeratorContext.callAsync()}.
+	 */
+	public SourceCoordinatorProvider(
+			OperatorID operatorID,
+			Source<?, SplitT, ?> source,
+			int numWorkerThreads) {
+		this.operatorID = operatorID;
+		this.source = source;
+		this.numWorkerThreads = numWorkerThreads;
+	}
+
+	@Override
+	public OperatorID getOperatorId() {
+		return operatorID;
+	}
+
+	@Override
+	public OperatorCoordinator create(OperatorCoordinator.Context context) {
+		final String coordinatorThreadName = "SourceCoordinator-" + operatorID;
+		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(

Review comment:
       `DispatcherThreadFactory` seems a little overkilling. It calls System.exit() which cause the entire JM to exit. In our case, I am wondering if it is sufficient to just fail the job?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org