You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/22 11:59:30 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Fix task running error. (#2500)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 730215258 [Engine][Task] Fix task running error. (#2500)
730215258 is described below

commit 730215258ba1fec6249acde37e32bb05c9f6492a
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Aug 22 19:59:24 2022 +0800

    [Engine][Task] Fix task running error. (#2500)
---
 .../engine/client/SeaTunnelClientTest.java         |  2 --
 .../server/operation/DeployTaskOperation.java      |  6 ++--
 .../serializable/TaskDataSerializerHook.java       |  9 ++++--
 .../engine/server/task/SeaTunnelTask.java          |  8 ++++-
 .../server/task/SourceSplitEnumeratorTask.java     | 30 ++++++++++++-------
 .../server/task/flow/SourceFlowLifeCycle.java      |  5 ++--
 .../server/task/flow/TransformFlowLifeCycle.java   |  6 ++++
 .../task/group/TaskGroupWithIntermediateQueue.java |  8 ++---
 ...erOperation.java => CloseRequestOperation.java} | 35 +++++++++++-----------
 ...tion.java => SourceNoMoreElementOperation.java} |  6 ++--
 10 files changed, 68 insertions(+), 47 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 6a1c992a5..6ed009de7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -84,8 +84,6 @@ public class SeaTunnelClientTest {
             jobProxy.waitForJobComplete();
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
-            // TODO throw exception after fix sink.setTypeInfo in ConnectorInstanceLoader
-            //            throw new RuntimeException(e);
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
index 8a36d9ea4..4c48060b4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
 import com.hazelcast.internal.nio.IOUtil;
@@ -41,8 +41,8 @@ public class DeployTaskOperation extends AsyncOperation {
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
-        TaskExecutionService taskExecutionService = getService();
-        return taskExecutionService.deployTask(taskImmutableInformation);
+        SeaTunnelServer server = getService();
+        return server.getTaskExecutionService().deployTask(taskImmutableInformation);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index b7b0995c4..059325a36 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -24,9 +24,10 @@ import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.source.CloseRequestOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
-import org.apache.seatunnel.engine.server.task.operation.source.SourceUnregisterOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -53,6 +54,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
 
     public static final int PROGRESS_TYPE = 9;
 
+    public static final int CLOSE_REQUEST_TYPE = 10;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
             SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
             SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -82,7 +85,7 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                 case TASK_GROUP_INFO_TYPE:
                     return new TaskGroupImmutableInformation();
                 case SOURCE_UNREGISTER_TYPE:
-                    return new SourceUnregisterOperation();
+                    return new SourceNoMoreElementOperation();
                 case SINK_REGISTER_TYPE:
                     return new SinkRegisterOperation();
                 case SINK_UNREGISTER_TYPE:
@@ -91,6 +94,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                     return new TaskLocation();
                 case PROGRESS_TYPE:
                     return new Progress();
+                case  CLOSE_REQUEST_TYPE:
+                    return new CloseRequestOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 39d0f6294..b5098976c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -61,6 +61,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     protected FlowLifeCycle startFlowLifeCycle;
 
+    private List<FlowLifeCycle> allCycles;
+
     protected List<OneInputFlowLifeCycle<Record<?>>> outputs;
 
     protected List<CompletableFuture<Void>> flowFutures;
@@ -79,7 +81,11 @@ public abstract class SeaTunnelTask extends AbstractTask {
     public void init() throws Exception {
         super.init();
         flowFutures = new ArrayList<>();
+        allCycles = new ArrayList<>();
         startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
+        for (FlowLifeCycle cycle : allCycles) {
+            cycle.init();
+        }
     }
 
     public void setTaskGroup(TaskGroup group) {
@@ -132,7 +138,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
         } else {
             throw new UnknownFlowException(flow);
         }
-        lifeCycle.init();
+        allCycles.add(lifeCycle);
         return lifeCycle;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 9f7d97f80..c673271d0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -23,21 +23,25 @@ import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
+import org.apache.seatunnel.engine.server.task.operation.source.CloseRequestOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.EnumeratorState;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.stream.Collectors;
 
 public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends CoordinatorTask {
@@ -49,7 +53,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private final SourceAction<?, SplitT, ?> source;
     private SourceSplitEnumerator<?, ?> enumerator;
     private int maxReaderSize;
-    private AtomicInteger unfinishedReaders;
+    private Set<Long> unfinishedReaders;
     private Map<TaskLocation, Address> taskMemberMapping;
     private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
 
@@ -70,7 +74,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         taskMemberMapping = new ConcurrentHashMap<>();
         taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
         maxReaderSize = source.getParallelism();
-        unfinishedReaders = new AtomicInteger(maxReaderSize);
+        unfinishedReaders = new CopyOnWriteArraySet<>();
         enumerator.open();
     }
 
@@ -122,8 +126,8 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     }
 
     public void readerFinished(long taskID) {
-        removeTaskMemberMapping(taskID);
-        if (unfinishedReaders.decrementAndGet() == 0) {
+        unfinishedReaders.remove(taskID);
+        if (unfinishedReaders.isEmpty()) {
             readerFinishFuture.complete(null);
         }
     }
@@ -147,6 +151,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
                 currState = EnumeratorState.READER_CLOSED;
                 break;
             case READER_CLOSED:
+                closeAllReader();
                 currState = EnumeratorState.CLOSED;
                 break;
             case CLOSED:
@@ -167,16 +172,19 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         readerRegisterFuture.join();
     }
 
-    public void removeTaskMemberMapping(long taskID) {
-        TaskLocation taskLocation = taskIDToTaskLocationMapping.get(taskID);
-        taskMemberMapping.remove(taskLocation);
-        taskIDToTaskLocationMapping.remove(taskID);
-    }
-
     public Set<Long> getRegisteredReaders() {
         return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskID).collect(Collectors.toSet());
     }
 
+    private void closeAllReader() {
+        List<InvocationFuture<?>> futures = new ArrayList<>();
+        taskMemberMapping.forEach((location, address) -> {
+            futures.add(this.getExecutionContext().sendToMember(new CloseRequestOperation(location),
+                    address));
+        });
+        futures.forEach(InvocationFuture::join);
+    }
+
     @Override
     public Set<URL> getJarsUrl() {
         return new HashSet<>(source.getJarUrls());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index c5f4fb9cc..3b499d63f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -25,8 +25,8 @@ import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SourceReaderContext;
 import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
-import org.apache.seatunnel.engine.server.task.operation.source.SourceUnregisterOperation;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -95,9 +95,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
         try {
             this.closed = true;
             collector.close();
-            runningTask.getExecutionContext().sendToMaster(new SourceUnregisterOperation(currentTaskID,
+            runningTask.getExecutionContext().sendToMaster(new SourceNoMoreElementOperation(currentTaskID,
                     enumeratorTaskID)).get();
-            this.close();
         } catch (Exception e) {
             LOGGER.warning("source close failed ", e);
             throw new RuntimeException(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index ebd90a0f8..c1698450a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.flow;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.server.task.TaskRuntimeException;
 import org.apache.seatunnel.engine.server.task.record.ClosedSign;
 
 import java.util.List;
@@ -42,6 +43,11 @@ public class TransformFlowLifeCycle<T> extends AbstractFlowLifeCycle implements
     public void received(Record<?> row) {
         if (row.getData() instanceof ClosedSign) {
             collector.collect(row);
+            try {
+                this.close();
+            } catch (Exception e) {
+                throw new TaskRuntimeException(e);
+            }
         } else {
             T r = (T) row.getData();
             for (SeaTunnelTransform<T> t : transform) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
index 6ec3c86b1..08d5de1c1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
@@ -23,10 +23,10 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
 
@@ -40,15 +40,13 @@ public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
 
     @Override
     public void init() {
-        blockingQueueCache = new HashMap<>();
+        blockingQueueCache = new ConcurrentHashMap<>();
         getTasks().stream().filter(SeaTunnelTask.class::isInstance)
                 .map(s -> (SeaTunnelTask) s).forEach(s -> s.setTaskGroup(this));
     }
 
     public BlockingQueue<Record<?>> getBlockingQueueCache(long id) {
-        if (!blockingQueueCache.containsKey(id)) {
-            blockingQueueCache.put(id, new ArrayBlockingQueue<>(QUEUE_SIZE));
-        }
+        blockingQueueCache.computeIfAbsent(id, i -> new ArrayBlockingQueue<>(QUEUE_SIZE));
         return blockingQueueCache.get(id);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
similarity index 67%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index 935d81a9d..ee3783352 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation.source;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
+import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
@@ -29,40 +29,41 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class SourceUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class CloseRequestOperation extends Operation implements IdentifiedDataSerializable {
 
-    private TaskLocation currentTaskID;
-    private TaskLocation enumeratorTaskID;
+    private TaskLocation readerLocation;
 
-    public SourceUnregisterOperation() {
+    public CloseRequestOperation() {
     }
 
-    public SourceUnregisterOperation(TaskLocation currentTaskID, TaskLocation enumeratorTaskID) {
-        this.currentTaskID = currentTaskID;
-        this.enumeratorTaskID = enumeratorTaskID;
+    public CloseRequestOperation(TaskLocation readerLocation) {
+        this.readerLocation = readerLocation;
     }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
-                        .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
-        task.readerFinished(currentTaskID.getTaskID());
+        SourceSeaTunnelTask<?, ?> task =
+                server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
+                        .getTaskGroup().getTask(readerLocation.getTaskID());
+        task.close();
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        currentTaskID.writeData(out);
-        enumeratorTaskID.writeData(out);
+        readerLocation.writeData(out);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        currentTaskID.readData(in);
-        enumeratorTaskID.readData(in);
+        readerLocation.readData(in);
     }
 
     @Override
@@ -72,6 +73,6 @@ public class SourceUnregisterOperation extends Operation implements IdentifiedDa
 
     @Override
     public int getClassId() {
-        return TaskDataSerializerHook.SOURCE_UNREGISTER_TYPE;
+        return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
similarity index 91%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 935d81a9d..b8cabcc16 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -29,15 +29,15 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class SourceUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class SourceNoMoreElementOperation extends Operation implements IdentifiedDataSerializable {
 
     private TaskLocation currentTaskID;
     private TaskLocation enumeratorTaskID;
 
-    public SourceUnregisterOperation() {
+    public SourceNoMoreElementOperation() {
     }
 
-    public SourceUnregisterOperation(TaskLocation currentTaskID, TaskLocation enumeratorTaskID) {
+    public SourceNoMoreElementOperation(TaskLocation currentTaskID, TaskLocation enumeratorTaskID) {
         this.currentTaskID = currentTaskID;
         this.enumeratorTaskID = enumeratorTaskID;
     }