You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/02 04:20:49 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-721] Remove additional ack. Simplify watermark manager (remov…
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f42c5f8 [GOBBLIN-721] Remove additional ack. Simplify watermark manager (remov…
f42c5f8 is described below
commit f42c5f8e565f96f96adfc35b8c2ebed0c8c2e903
Author: Shirshanka Das <sd...@linkedin.com>
AuthorDate: Mon Apr 1 21:20:41 2019 -0700
[GOBBLIN-721] Remove additional ack. Simplify watermark manager (remov…
Closes #2588 from shirshanka/stream-fix
---
.../writer/InstrumentedDataWriterDecorator.java | 12 -
.../apache/gobblin/writer/AsyncWriterManager.java | 10 -
.../writer/MultiWriterWatermarkManager.java | 159 ----------
.../gobblin/writer/WatermarkAwareWriter.java | 24 +-
.../writer/WatermarkAwareWriterWrapper.java | 8 -
.../writer/MultiWriterWatermarkManagerTest.java | 350 ---------------------
.../org/apache/gobblin/writer/ConsoleWriter.java | 24 +-
.../gobblin/writer/PartitionedDataWriter.java | 37 ---
.../gobblin/writer/PartitionedWriterTest.java | 43 ---
.../writer/ElasticsearchRestWriter.java | 12 -
.../gobblin/runtime/StreamModelTaskRunner.java | 3 +-
.../main/java/org/apache/gobblin/runtime/Task.java | 37 +--
.../java/org/apache/gobblin/runtime/fork/Fork.java | 2 -
.../apache/gobblin/runtime/TaskContinuousTest.java | 241 +++++++-------
14 files changed, 134 insertions(+), 828 deletions(-)
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
index efc349f..af49d15 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
@@ -143,18 +143,6 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa
}
@Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- Preconditions.checkState(isWatermarkCapable());
- return watermarkAwareWriter.get().getCommittableWatermark();
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- Preconditions.checkState(isWatermarkCapable());
- return watermarkAwareWriter.get().getUnacknowledgedWatermark();
- }
-
- @Override
public ControlMessageHandler getMessageHandler() {
return this.embeddedWriter.getMessageHandler();
}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
index a599753..4f690d1 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
@@ -117,16 +117,6 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
return true;
}
- @Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- throw new UnsupportedOperationException("This writer does not keep track of committed watermarks");
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- throw new UnsupportedOperationException("This writer does not keep track of uncommitted watermarks");
- }
-
/**
* A class to store attempts at writing a record
**/
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/MultiWriterWatermarkManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/MultiWriterWatermarkManager.java
deleted file mode 100644
index c4993f7..0000000
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/MultiWriterWatermarkManager.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.writer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import lombok.Getter;
-import lombok.ToString;
-
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
-import org.apache.gobblin.util.ExecutorsUtils;
-
-
-/**
- * Responsible for managing continuous commit of watermarks.
- * Periodically fetches watermarks from WatermarkAwareWriters and commits them to WatermarkStorage.
- * TODO: Add metrics monitoring
- */
-public class MultiWriterWatermarkManager implements WatermarkManager {
-
-
- private final Queue<WatermarkAwareWriter> _watermarkAwareWriters;
- private final WatermarkStorage _watermarkStorage;
- private final long _commitIntervalMillis;
- private final ScheduledExecutorService _watermarkCommitThreadPool;
- private final Logger _logger;
- private final RetrievalStatus _retrievalStatus;
- private final CommitStatus _commitStatus;
-
- @VisibleForTesting
- final Runnable _watermarkCommitter = new
-
- Runnable() {
- @Override
- public void run() {
- long startTime = System.nanoTime();
- Map<String, CheckpointableWatermark> watermarksToCommit = null;
- try {
- _retrievalStatus.onAttempt();
- WatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
- for (WatermarkAwareWriter writer : _watermarkAwareWriters) {
- Map<String, CheckpointableWatermark> writerWatermarks = writer.getCommittableWatermark();
- _logger.debug("Retrieved from writer {} : watermark {} ", writer.getClass().getName(), writerWatermarks);
- watermarkTracker.committedWatermarks(writerWatermarks);
- }
- watermarksToCommit = watermarkTracker.getAllCommitableWatermarks();
- _retrievalStatus.onSuccess(watermarksToCommit);
- }
- catch (Exception e) {
- _retrievalStatus.onFailure(e);
- _logger.error("Failed to get watermark", e);
- }
- // Prevent multiple commits concurrently
- synchronized (this) {
- if (watermarksToCommit != null && !watermarksToCommit.isEmpty()) {
- try {
- _commitStatus.onAttempt();
- _logger.info("Will commit watermark {}", watermarksToCommit.toString());
- //TODO: Not checking if this watermark has already been committed successfully.
- _watermarkStorage.commitWatermarks(watermarksToCommit.values());
- _commitStatus.onSuccess(watermarksToCommit);
- } catch (Exception e) {
- _commitStatus.onFailure(e, watermarksToCommit);
- _logger.error("Failed to write watermark", e);
- }
- } else {
- _logger.info("Nothing to commit");
- }
- }
- long duration = (System.nanoTime() - startTime)/1000000;
- _logger.info("Duration of run {} milliseconds", duration);
- }
- };
-
-
- public MultiWriterWatermarkManager(WatermarkStorage storage, long commitIntervalMillis, Optional<Logger> logger) {
- Preconditions.checkArgument(storage != null, "WatermarkStorage cannot be null");
- _watermarkAwareWriters = new ConcurrentLinkedQueue<>();
- _watermarkStorage = storage;
- _commitIntervalMillis = commitIntervalMillis;
- _logger = logger.or(LoggerFactory.getLogger(MultiWriterWatermarkManager.class));
- _watermarkCommitThreadPool = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(logger,
- Optional.of("WatermarkManager-%d")));
- _retrievalStatus = new RetrievalStatus();
- _commitStatus = new CommitStatus();
- }
-
- public void registerWriter(WatermarkAwareWriter dataWriter) {
- _watermarkAwareWriters.add(dataWriter);
- _logger.info("Registered a watermark aware writer {}", dataWriter.getClass().getName());
- }
-
- @Override
- public void start() {
- _watermarkCommitThreadPool
- .scheduleWithFixedDelay(_watermarkCommitter, 0, _commitIntervalMillis, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void close()
- throws IOException {
- _logger.info("Watermark committer closing");
- _watermarkCommitThreadPool.shutdown();
- try {
- long startTime = System.nanoTime();
- _watermarkCommitThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- long duration = (System.nanoTime() - startTime)/ 1000000;
- _logger.info("Duration of termination wait was {} milliseconds", duration);
- }
- catch (InterruptedException ie) {
- throw new IOException("Interrupted while waiting for committer to shutdown", ie);
- }
- finally {
- // final watermark commit
- _logger.info("Watermark committer: one last commit before shutting down");
- _watermarkCommitter.run();
- }
- }
-
- @Override
- public CommitStatus getCommitStatus() {
- return _commitStatus;
- }
-
- @Override
- public RetrievalStatus getRetrievalStatus() {
- return _retrievalStatus;
- }
-}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java
index 581fd0f..bec90a4 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java
@@ -35,28 +35,18 @@ public interface WatermarkAwareWriter<D> extends DataWriter<D> {
*
* @return true if the writer can support watermark-bearing record envelopes
*/
- boolean isWatermarkCapable();
+ default boolean isWatermarkCapable() {
+ return true;
+ }
/**
* Write a record (possibly asynchronously), ack the envelope on success.
* @param recordEnvelope: a container for the record and the acknowledgable watermark
* @throws IOException: if this write (or preceding write failures) have caused a fatal exception.
*/
- void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException;
-
- /**
- * @return A Watermark per source that can safely be committed because all records associated with it
- * and earlier watermarks have been committed to the destination. Return empty if no such watermark exists.
- *
- */
- @Deprecated
- Map<String, CheckpointableWatermark> getCommittableWatermark();
-
- /**
- *
- * @return The lowest watermark out of all pending write requests
- */
- @Deprecated
- Map<String, CheckpointableWatermark> getUnacknowledgedWatermark();
+ default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
+ write(recordEnvelope.getRecord());
+ recordEnvelope.ack();
+ }
}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java
index a2edb3b..686e178 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java
@@ -43,12 +43,4 @@ public abstract class WatermarkAwareWriterWrapper<D> extends WriterWrapper<D> im
watermarkAwareWriter.get().writeEnvelope(recordEnvelope);
}
- public final Map<String, CheckpointableWatermark> getCommittableWatermark() {
- return watermarkAwareWriter.get().getCommittableWatermark();
- }
-
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- return watermarkAwareWriter.get().getUnacknowledgedWatermark();
- }
-
}
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/MultiWriterWatermarkManagerTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/MultiWriterWatermarkManagerTest.java
deleted file mode 100644
index fd59bc6..0000000
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/MultiWriterWatermarkManagerTest.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.writer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
-import org.apache.gobblin.source.extractor.DefaultCheckpointableWatermark;
-import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-
-@Test
-public class MultiWriterWatermarkManagerTest {
-
-
- @Test
- public void testConstructor() {
-
- WatermarkStorage watermarkStorage = null;
- try {
- MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(watermarkStorage, 0, Optional.<Logger>absent());
- Assert.fail("Should have thrown an exception");
- } catch (Exception e) {
-
- }
- }
-
- /**
- * Test that when no sources are registered, no side effects are observed
- */
- @Test
- public void testNoRegisteredSource()
- throws IOException, InterruptedException {
-
- WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class);
- MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(mockWatermarkStorage, 1000, Optional.<Logger>absent());
- try {
- watermarkManager.start();
- } catch (Exception e) {
- Assert.fail("Should not throw exception", e);
- }
-
- Thread.sleep(2000);
-
- watermarkManager.close();
- verify(mockWatermarkStorage, times(0)).commitWatermarks(any(Iterable.class));
-
- MultiWriterWatermarkManager.CommitStatus watermarkMgrStatus = watermarkManager.getCommitStatus();
- Assert.assertTrue(watermarkMgrStatus.getLastCommittedWatermarks().isEmpty(),
- "Last committed watermarks should be empty");
- Assert.assertEquals(watermarkMgrStatus.getLastWatermarkCommitSuccessTimestampMillis(), 0 ,
- "Last committed watermark timestamp should be 0");
-
- }
-
- /**
- * Test that when we have commits failing to watermark storage, the manager continues to try
- * at every interval and keeps track of the exception it is seeing.
- */
- @Test
- public void testFailingWatermarkStorage()
- throws IOException, InterruptedException {
-
- WatermarkStorage reallyBadWatermarkStorage = mock(WatermarkStorage.class);
- IOException exceptionToThrow = new IOException("Failed to write coz the programmer told me to");
-
- doThrow(exceptionToThrow).when(reallyBadWatermarkStorage).commitWatermarks(any(Iterable.class));
-
-
- long commitInterval = 1000;
-
- MultiWriterWatermarkManager
- watermarkManager = new MultiWriterWatermarkManager(reallyBadWatermarkStorage, commitInterval, Optional.<Logger>absent());
-
- WatermarkAwareWriter mockWriter = mock(WatermarkAwareWriter.class);
- CheckpointableWatermark watermark = new DefaultCheckpointableWatermark("default", new LongWatermark(0));
- when(mockWriter.getCommittableWatermark()).thenReturn(Collections.singletonMap("default", watermark));
- watermarkManager.registerWriter(mockWriter);
- try {
- watermarkManager.start();
- } catch (Exception e) {
- Assert.fail("Should not throw exception", e);
- }
-
- Thread.sleep(commitInterval * 2 + (commitInterval/2)); // sleep for 2.5 iterations
- watermarkManager.close();
- int expectedCalls = 3; // 2 calls from iterations, 1 additional attempt due to close
- verify(reallyBadWatermarkStorage, atLeast(expectedCalls)).commitWatermarks(any(Iterable.class));
- Assert.assertEquals(watermarkManager.getCommitStatus().getLastCommitException(), exceptionToThrow,
- "Testing tracking of failed exceptions");
-
- }
-
-
- private WatermarkAwareWriter getFlakyWatermarkWriter(final long failEvery) {
- WatermarkAwareWriter mockWatermarkWriter = new WatermarkAwareWriter() {
-
- private long watermark = 0;
-
- @Override
- public boolean isWatermarkCapable() {
- return true;
- }
-
- @Override
- public void writeEnvelope(RecordEnvelope recordEnvelope)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- watermark++;
- if (watermark % failEvery == 0) {
- throw new RuntimeException("Failed because you asked me to");
- }
- return Collections.singletonMap("default",
- (CheckpointableWatermark) new DefaultCheckpointableWatermark("default", new LongWatermark(watermark)));
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- return null;
- }
-
- @Override
- public void write(Object record)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void commit()
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void cleanup()
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long recordsWritten() {
- return 0;
- }
-
- @Override
- public long bytesWritten()
- throws IOException {
- return 0;
- }
-
- @Override
- public void close()
- throws IOException {
-
- }
- };
-
- return mockWatermarkWriter;
-
- }
-
- /**
- * Test that in the presence of flaky Watermark writers, we continue to log retrieval status correctly
- */
- @Test
- public void testRetrievalStatus()
- throws InterruptedException, IOException {
-
- WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class);
-
- MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(mockWatermarkStorage, 1000, Optional.<Logger>absent());
-
- watermarkManager.registerWriter(getFlakyWatermarkWriter(2));
-
-
- try {
- watermarkManager.start();
- } catch (Exception e) {
- Assert.fail("Should not throw exception", e);
- }
-
- Thread.sleep(2000);
-
- watermarkManager.close();
-
- MultiWriterWatermarkManager.RetrievalStatus retrievalStatus = watermarkManager.getRetrievalStatus();
- Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalAttemptTimestampMillis() > 0);
- Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalSuccessTimestampMillis() > 0);
- Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalFailureTimestampMillis() > 0);
- System.out.println(retrievalStatus);
-
- }
-
- /**
- * Test that in the presence of intermittent commit successes and failures, we continue to make progress
- */
- @Test
- public void testFlakyWatermarkStorage()
- throws IOException, InterruptedException {
-
- final int failEvery = 2;
-
- WatermarkStorage mockWatermarkStorage = new WatermarkStorage() {
- private int watermarkInstance = 0;
- private List<CheckpointableWatermark> checkpointed = new ArrayList<>();
- @Override
- public void commitWatermarks(java.lang.Iterable<CheckpointableWatermark> watermarks)
- throws IOException {
- ++watermarkInstance;
- if (watermarkInstance % failEvery == 0) {
- throw new IOException("Failed to write");
- } else {
- checkpointed.clear();
- for (CheckpointableWatermark watermark: watermarks) {
- checkpointed.add(watermark);
- }
- }
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getCommittedWatermarks(
- Class<? extends CheckpointableWatermark> watermarkClass, Iterable<String> sourcePartitions)
- throws IOException {
- return null;
- }
- };
-
-
- WatermarkAwareWriter mockWatermarkWriter = new WatermarkAwareWriter() {
-
- private long watermark = 0;
-
- @Override
- public boolean isWatermarkCapable() {
- return true;
- }
-
- @Override
- public void writeEnvelope(RecordEnvelope recordEnvelope)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- watermark++;
- return Collections.singletonMap("default",
- (CheckpointableWatermark) new DefaultCheckpointableWatermark("default", new LongWatermark(watermark)));
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- return null;
- }
-
- @Override
- public void write(Object record)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void commit()
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void cleanup()
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long recordsWritten() {
- return 0;
- }
-
- @Override
- public long bytesWritten()
- throws IOException {
- return 0;
- }
-
- @Override
- public void close()
- throws IOException {
-
- }
- };
-
- MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(mockWatermarkStorage, 1000, Optional.<Logger>absent());
-
- watermarkManager.registerWriter(mockWatermarkWriter);
-
-
- try {
- watermarkManager.start();
- } catch (Exception e) {
- Assert.fail("Should not throw exception", e);
- }
-
- Thread.sleep(2000);
-
- watermarkManager.close();
-
- MultiWriterWatermarkManager.CommitStatus commitStatus = watermarkManager.getCommitStatus();
- System.out.println(commitStatus);
- MultiWriterWatermarkManager.RetrievalStatus retrievalStatus = watermarkManager.getRetrievalStatus();
- Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalAttemptTimestampMillis() > 0);
- Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalSuccessTimestampMillis() > 0);
- Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalFailureTimestampMillis() == 0);
- System.out.println(retrievalStatus);
-
- }
-
-}
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java
index 858bff8..a846e0a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java
@@ -78,29 +78,6 @@ public class ConsoleWriter<D> implements WatermarkAwareWriter<D> {
log.debug("Close called");
}
- @Override
- public boolean isWatermarkCapable() {
- return true;
- }
-
-
- @Override
- public void writeEnvelope(RecordEnvelope<D> recordEnvelope)
- throws IOException {
- write(recordEnvelope.getRecord());
- recordEnvelope.ack();
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- throw new UnsupportedOperationException("This writer does not keep track of committed watermarks");
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- throw new UnsupportedOperationException("This writer does not keep track of uncommitted watermarks");
- }
-
/**
* Flush console output
*/
@@ -108,5 +85,6 @@ public class ConsoleWriter<D> implements WatermarkAwareWriter<D> {
public void flush() throws IOException {
System.out.flush();
}
+
}
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index cdfc11b..3dad0ef 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -305,43 +305,6 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
}
@Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- // The committable watermark from a collection of commitable and unacknowledged watermarks is the highest
- // committable watermark that is less than the lowest unacknowledged watermark
-
- WatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
- for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
- if (entry.getValue() instanceof WatermarkAwareWriter) {
- Map<String, CheckpointableWatermark> commitableWatermarks =
- ((WatermarkAwareWriter) entry.getValue()).getCommittableWatermark();
- if (!commitableWatermarks.isEmpty()) {
- watermarkTracker.committedWatermarks(commitableWatermarks);
- }
-
- Map<String, CheckpointableWatermark> unacknowledgedWatermark =
- ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark();
- if (!unacknowledgedWatermark.isEmpty()) {
- watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
- }
- }
- }
- return watermarkTracker.getAllCommitableWatermarks(); //TODO: Change this to use List of committables instead
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- WatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
- for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
- Map<String, CheckpointableWatermark> unacknowledgedWatermark =
- ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark();
- if (!unacknowledgedWatermark.isEmpty()) {
- watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
- }
- }
- return watermarkTracker.getAllUnacknowledgedWatermarks();
- }
-
- @Override
public ControlMessageHandler getMessageHandler() {
return this.controlMessageHandler;
}
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 20984ef..ddab5c2 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -140,49 +140,6 @@ public class PartitionedWriterTest {
Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.COMMIT);
}
- @Test
- public void testWatermarkComputation() throws IOException {
- testWatermarkComputation(0L, 1L, 0L);
- testWatermarkComputation(1L, 0L, null);
- testWatermarkComputation(0L, 0L, null);
- testWatermarkComputation(20L, 1L, null);
- }
-
- public void testWatermarkComputation(Long committed, Long unacknowledged, Long expected) throws IOException {
- State state = new State();
- state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, TestPartitioner.class.getCanonicalName());
-
- String defaultSource = "default";
-
- WatermarkAwareWriter mockDataWriter = mock(WatermarkAwareWriter.class);
- when(mockDataWriter.isWatermarkCapable()).thenReturn(true);
- when(mockDataWriter.getCommittableWatermark()).thenReturn(Collections.singletonMap(defaultSource,
- new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(committed))));
- when(mockDataWriter.getUnacknowledgedWatermark()).thenReturn(Collections.singletonMap(defaultSource,
- new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(unacknowledged))));
-
- PartitionAwareDataWriterBuilder builder = mock(PartitionAwareDataWriterBuilder.class);
- when(builder.validatePartitionSchema(any(Schema.class))).thenReturn(true);
- when(builder.forPartition(any(GenericRecord.class))).thenReturn(builder);
- when(builder.withWriterId(any(String.class))).thenReturn(builder);
- when(builder.build()).thenReturn(mockDataWriter);
-
- PartitionedDataWriter writer = new PartitionedDataWriter<String, String>(builder, state);
-
- RecordEnvelope<String> recordEnvelope = new RecordEnvelope<String>("0");
- recordEnvelope.addCallBack(
- new AcknowledgableWatermark(new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(0))));
- writer.writeEnvelope(recordEnvelope);
-
- Map<String, CheckpointableWatermark> watermark = writer.getCommittableWatermark();
- System.out.println(watermark.toString());
- if (expected == null) {
- Assert.assertTrue(watermark.isEmpty(), "Expected watermark to be absent");
- } else {
- Assert.assertTrue(watermark.size() == 1);
- Assert.assertEquals((long) expected, ((LongWatermark) watermark.values().iterator().next().getWatermark()).getValue());
- }
- }
@Test
public void testControlMessageHandler() throws IOException {
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
index 7cd77da..9d99bc6 100644
--- a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
@@ -22,37 +22,25 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.math3.util.Pair;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.Batch;
import org.apache.gobblin.writer.BatchAsyncDataWriter;
-import org.apache.gobblin.writer.GenericWriteResponse;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.http.HttpHost;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.xcontent.XContentType;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index b20aeda..f490997 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -72,7 +72,6 @@ public class StreamModelTaskRunner {
private final Optional<WatermarkManager> watermarkManager;
private final Optional<WatermarkStorage> watermarkStorage;
private final Map<Optional<Fork>, Optional<Future<?>>> forks;
- private final String watermarkingStrategy;
protected void run() throws Exception {
long maxWaitInMinute = taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES, ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES);
@@ -140,7 +139,7 @@ public class StreamModelTaskRunner {
Fork fork = new Fork(this.taskContext, forkedStream.getGlobalMetadata().getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode);
fork.consumeRecordStream(forkedStream);
this.forks.put(Optional.of(fork), Optional.of(Futures.immediateFuture(null)));
- this.task.configureStreamingFork(fork, this.watermarkingStrategy);
+ this.task.configureStreamingFork(fork);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 8ddd1d4..db89a17 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -87,7 +87,6 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AcknowledgableWatermark;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
-import org.apache.gobblin.writer.MultiWriterWatermarkManager;
import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
import org.apache.gobblin.writer.WatermarkAwareWriter;
import org.apache.gobblin.writer.WatermarkManager;
@@ -149,7 +148,6 @@ public class Task implements TaskIFace {
private final InstrumentedExtractorBase extractor;
private final RowLevelPolicyChecker rowChecker;
private final ExecutionModel taskMode;
- private final String watermarkingStrategy;
private final Optional<WatermarkManager> watermarkManager;
private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
private final Optional<WatermarkStorage> watermarkStorage;
@@ -235,8 +233,6 @@ public class Task implements TaskIFace {
// Setup Streaming constructs
- this.watermarkingStrategy = "FineGrain"; // TODO: Configure
-
if (isStreamingTask()) {
Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
if (!(underlyingExtractor instanceof StreamingExtractor)) {
@@ -259,18 +255,11 @@ public class Task implements TaskIFace {
long commitIntervalMillis = ConfigUtils.getLong(config,
TaskConfigurationKeys.STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS,
TaskConfigurationKeys.DEFAULT_STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS);
- if (watermarkingStrategy.equals("FineGrain")) { // TODO: Configure
- this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(config)));
- this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
- new TrackerBasedWatermarkManager(this.watermarkStorage.get(), this.watermarkTracker.get(),
- commitIntervalMillis, Optional.of(this.LOG))));
+ this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(config)));
+ this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
+ new TrackerBasedWatermarkManager(this.watermarkStorage.get(), this.watermarkTracker.get(),
+ commitIntervalMillis, Optional.of(this.LOG))));
- } else {
- // writer-based watermarking
- this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
- new MultiWriterWatermarkManager(this.watermarkStorage.get(), commitIntervalMillis, Optional.of(this.LOG))));
- this.watermarkTracker = Optional.absent();
- }
} else {
this.watermarkManager = Optional.absent();
this.watermarkTracker = Optional.absent();
@@ -368,7 +357,7 @@ public class Task implements TaskIFace {
} else {
new StreamModelTaskRunner(this, this.taskState, this.closer, this.taskContext, this.extractor,
this.converter, this.recordStreamProcessors, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested,
- this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks, this.watermarkingStrategy).run();
+ this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks).run();
}
LOG.info("Extracted " + this.recordsPulled + " data records");
@@ -434,7 +423,7 @@ public class Task implements TaskIFace {
AsynchronousFork fork = closer.register(
new AsynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema,
branches, i, this.taskMode));
- configureStreamingFork(fork, watermarkingStrategy);
+ configureStreamingFork(fork);
// Run the Fork
this.forks.put(Optional.<Fork>of(fork), Optional.<Future<?>>of(this.taskExecutor.submit(fork)));
} else {
@@ -445,10 +434,11 @@ public class Task implements TaskIFace {
SynchronousFork fork = closer.register(
new SynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema,
branches, 0, this.taskMode));
- configureStreamingFork(fork, watermarkingStrategy);
+ configureStreamingFork(fork);
this.forks.put(Optional.<Fork>of(fork), Optional.<Future<?>> of(this.taskExecutor.submit(fork)));
}
+ LOG.info("Task mode streaming = " + isStreamingTask());
if (isStreamingTask()) {
// Start watermark manager and tracker
@@ -531,14 +521,13 @@ public class Task implements TaskIFace {
}
}
- protected void configureStreamingFork(Fork fork, String watermarkingStrategy) throws IOException {
+ protected void configureStreamingFork(Fork fork) throws IOException {
if (isStreamingTask()) {
DataWriter forkWriter = fork.getWriter();
- if (forkWriter instanceof WatermarkAwareWriter) {
- if (watermarkingStrategy.equals("WriterBased")) {
- ((MultiWriterWatermarkManager) this.watermarkManager.get()).registerWriter((WatermarkAwareWriter) forkWriter);
- }
- } else {
+ boolean isWaterMarkAwareWriter = (forkWriter instanceof WatermarkAwareWriter)
+ && ((WatermarkAwareWriter) forkWriter).isWatermarkCapable();
+
+ if (!isWaterMarkAwareWriter) {
String errorMessage = String.format("The Task is configured to run in continuous mode, "
+ "but the writer %s is not a WatermarkAwareWriter", forkWriter.getClass().getName());
LOG.error(errorMessage);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 68f4536..7ce2bdf 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -496,8 +496,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
recordEnvelope.withRecord(convertedRecord));
}
}
- // ack this fork's processing done
- recordEnvelope.ack();
} else {
buildWriterIfNotPresent();
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
index ef57684..97da4df 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.runtime;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.gobblin.runtime.util.TaskMetrics;
+import org.apache.gobblin.util.TestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.testng.Assert;
@@ -58,8 +58,6 @@ import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
-import org.apache.gobblin.source.workunit.Extract;
-import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.WatermarkAwareWriter;
@@ -220,6 +218,15 @@ public class TaskContinuousTest {
public void start(WatermarkStorage watermarkStorage) {
}
+
+ @Override
+ public void shutdown() throws JobShutdownException {
+ try {
+ this.close();
+ } catch (Exception e) {
+ throw new JobShutdownException("Failed to close extractor during shutdown");
+ }
+ }
}
@@ -252,90 +259,87 @@ public class TaskContinuousTest {
public void testContinuousTaskOneRecord()
throws Exception {
- ArrayList<Object> recordCollector = new ArrayList<>(100);
+ for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
- String testRecord = "hello";
+ ArrayList<Object> recordCollector = new ArrayList<>(100);
- OneRecordExtractor oneRecordExtractor =
- new OneRecordExtractor(testRecord);
+ String testRecord = "hello";
+ OneRecordExtractor oneRecordExtractor = new OneRecordExtractor(testRecord);
- TaskContext mockTaskContext =
- getMockTaskContext(recordCollector, oneRecordExtractor);
-
- // Create a mock TaskPublisher
- TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);
- when(mockTaskPublisher.canPublish()).thenReturn(TaskPublisher.PublisherState.SUCCESS);
- when(mockTaskContext.getTaskPublisher(any(TaskState.class), any(TaskLevelPolicyCheckResults.class))).thenReturn(mockTaskPublisher);
+ TaskContext mockTaskContext = getMockTaskContext(recordCollector, oneRecordExtractor, taskExecutionSync);
- // Create a mock TaskStateTracker
- TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+ // Create a mock TaskPublisher
+ TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);
+ when(mockTaskPublisher.canPublish()).thenReturn(TaskPublisher.PublisherState.SUCCESS);
+ when(mockTaskContext.getTaskPublisher(any(TaskState.class), any(TaskLevelPolicyCheckResults.class))).thenReturn(mockTaskPublisher);
- // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
- TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ // Create a mock TaskStateTracker
+ TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
- // Create the Task
- Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
+ // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
+ TaskExecutor taskExecutor = new TaskExecutor(new Properties());
- ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
+ // Create the Task
+ Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
- taskRunner.execute(task);
+ ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
+ taskRunner.execute(task);
- // Let the task run for 2 seconds
- int sleepIterations = 2;
- int currentIteration = 0;
- WatermarkStorage mockWatermarkStorage = mockTaskContext.getWatermarkStorage();
- Map<String, CheckpointableWatermark> externalWatermarkStorage;
- while (currentIteration < sleepIterations) {
- Thread.sleep(1000);
- currentIteration++;
- externalWatermarkStorage =
- mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList
- .of("default"));
- if (!externalWatermarkStorage.isEmpty()) {
- for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
- log.info("Observed committed watermark: {}", watermark);
+ // Let the task run for 2 seconds
+ int sleepIterations = 2;
+ int currentIteration = 0;
+ WatermarkStorage mockWatermarkStorage = mockTaskContext.getWatermarkStorage();
+ Map<String, CheckpointableWatermark> externalWatermarkStorage;
+ while (currentIteration < sleepIterations) {
+ Thread.sleep(1000);
+ currentIteration++;
+ externalWatermarkStorage =
+ mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
+ if (!externalWatermarkStorage.isEmpty()) {
+ for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
+ log.info("Observed committed watermark: {}", watermark);
+ }
+ log.info("Task progress: {}", task.getProgress());
+ // Ensure that watermarks seem reasonable at each step
+ Assert.assertTrue(oneRecordExtractor.validateWatermarks(false, externalWatermarkStorage));
}
- log.info("Task progress: {}", task.getProgress());
- // Ensure that watermarks seem reasonable at each step
- Assert.assertTrue(oneRecordExtractor.validateWatermarks(false, externalWatermarkStorage));
}
- }
-
- // Let's try to shutdown the task
- task.shutdown();
- log.info("Shutting down task now");
- boolean success = task.awaitShutdown(3000);
- Assert.assertTrue(success, "Task should shutdown in 3 seconds");
- log.info("Task done waiting to shutdown {}", success);
- externalWatermarkStorage =
- mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList
- .of("0"));
+ // Let's try to shutdown the task
+ task.shutdown();
+ log.info("Shutting down task now");
+ boolean success = task.awaitShutdown(3000);
+ Assert.assertTrue(success, "Task should shutdown in 3 seconds");
+ log.info("Task done waiting to shutdown {}", success);
- // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
- Assert.assertTrue(oneRecordExtractor.validateWatermarks(true, externalWatermarkStorage));
+ externalWatermarkStorage =
+ mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("0"));
- // Ensure that the record made it to the writer correctly
- Assert.assertEquals(recordCollector.size(), 1);
- Assert.assertEquals(recordCollector.get(0), testRecord);
+ // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
+ Assert.assertTrue(oneRecordExtractor.validateWatermarks(true, externalWatermarkStorage));
+ // Ensure that the record made it to the writer correctly
+ Assert.assertEquals(recordCollector.size(), 1);
+ Assert.assertEquals(recordCollector.get(0), testRecord);
- task.commit();
+ task.commit();
+ Assert.assertTrue(mockTaskContext.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL);
- // Shutdown the executor
- taskRunner.shutdown();
- taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+ // Shutdown the executor
+ taskRunner.shutdown();
+ taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+ }
}
private TaskContext getMockTaskContext(ArrayList<Object> recordCollector,
- Extractor mockExtractor)
+ Extractor mockExtractor, Boolean taskExecutionSync)
throws Exception {
- TaskState taskState = getStreamingTaskState();
+ TaskState taskState = getStreamingTaskState(taskExecutionSync);
// Create a mock RowLevelPolicyChecker
RowLevelPolicyChecker mockRowLevelPolicyChecker =
new RowLevelPolicyChecker(Lists.newArrayList(), "stateId", FileSystem.getLocal(new Configuration()));
@@ -364,14 +368,15 @@ public class TaskContinuousTest {
return mockTaskContext;
}
- private TaskState getStreamingTaskState() {
- WorkUnitState workUnitState = new WorkUnitState(WorkUnit.create(
- new Extract(Extract.TableType.SNAPSHOT_ONLY, this.getClass().getName(), this.getClass().getSimpleName())));
+ private TaskState getStreamingTaskState(Boolean taskExecutionSync) {
+ WorkUnitState workUnitState = TestUtils.createTestWorkUnitState();
+
workUnitState.setProp(ConfigurationKeys.TASK_KEY_KEY, "1234");
TaskState taskState = new TaskState(workUnitState);
taskState.setProp(ConfigurationKeys.METRICS_ENABLED_KEY, Boolean.toString(false));
taskState.setProp(TaskConfigurationKeys.TASK_EXECUTION_MODE, ExecutionModel.STREAMING.name());
+ taskState.setProp(ConfigurationKeys.TASK_SYNCHRONOUS_EXECUTION_MODEL_KEY, Boolean.toString(taskExecutionSync));
taskState.setJobId("1234");
taskState.setTaskId("testContinuousTaskId");
return taskState;
@@ -387,70 +392,65 @@ public class TaskContinuousTest {
@Test
public void testContinuousTask()
throws Exception {
- // Create a TaskState
- TaskState taskState = getStreamingTaskState();
- ArrayList<Object> recordCollector = new ArrayList<>(100);
- long perRecordExtractLatencyMillis = 1000; // 1 second per record
+ for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
+ ArrayList<Object> recordCollector = new ArrayList<>(100);
+ long perRecordExtractLatencyMillis = 1000; // 1 second per record
+ ContinuousExtractor continuousExtractor = new ContinuousExtractor(perRecordExtractLatencyMillis);
+ TaskContext mockTaskContext = getMockTaskContext(recordCollector, continuousExtractor, taskExecutionSync);
- ContinuousExtractor continuousExtractor =
- new ContinuousExtractor(perRecordExtractLatencyMillis);
+ // Create a mock TaskStateTracker
+ TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
- TaskContext mockTaskContext = getMockTaskContext(recordCollector, continuousExtractor);
+ // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
+ TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ // Create the Task
+ Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
- // Create a mock TaskStateTracker
- TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+ ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
- // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
- TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ taskRunner.execute(task);
- // Create the Task
- Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
+ // Let the task run for 10 seconds
+ int sleepIterations = 10;
+ int currentIteration = 0;
- ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
-
- taskRunner.execute(task);
-
-
- // Let the task run for 10 seconds
- int sleepIterations = 10;
- int currentIteration = 0;
-
- while (currentIteration < sleepIterations) {
- Thread.sleep(1000);
- currentIteration++;
- Map<String, CheckpointableWatermark> externalWatermarkStorage = mockTaskContext.getWatermarkStorage()
- .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
- if (!externalWatermarkStorage.isEmpty()) {
- for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
- log.info("Observed committed watermark: {}", watermark);
+ while (currentIteration < sleepIterations) {
+ Thread.sleep(1000);
+ currentIteration++;
+ Map<String, CheckpointableWatermark> externalWatermarkStorage = mockTaskContext.getWatermarkStorage()
+ .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
+ if (!externalWatermarkStorage.isEmpty()) {
+ for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
+ log.info("Observed committed watermark: {}", watermark);
+ }
+ log.info("Task progress: {}", task.getProgress());
+ // Ensure that watermarks seem reasonable at each step
+ Assert.assertTrue(continuousExtractor.validateWatermarks(false, externalWatermarkStorage));
}
- log.info("Task progress: {}", task.getProgress());
- // Ensure that watermarks seem reasonable at each step
- Assert.assertTrue(continuousExtractor.validateWatermarks(false, externalWatermarkStorage));
}
- }
- // Let's try to shutdown the task
- task.shutdown();
- log.info("Shutting down task now");
- boolean success = task.awaitShutdown(30000);
- Assert.assertTrue(success, "Task should shutdown in 3 seconds");
- log.info("Task done waiting to shutdown {}", success);
+ // Let's try to shutdown the task
+ task.shutdown();
+ log.info("Shutting down task now");
+ boolean success = task.awaitShutdown(30000);
+ Assert.assertTrue(success, "Task should shutdown in 3 seconds");
+ log.info("Task done waiting to shutdown {}", success);
- // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
- Assert.assertTrue(continuousExtractor.validateWatermarks(true, mockTaskContext.getWatermarkStorage()
- .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"))));
+ // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
+ Assert.assertTrue(continuousExtractor.validateWatermarks(true, mockTaskContext.getWatermarkStorage()
+ .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"))));
- task.commit();
-
- // Shutdown the executor
- taskRunner.shutdown();
- taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+ task.commit();
+ Assert.assertTrue(mockTaskContext.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL);
+ // Shutdown the executor
+ taskRunner.shutdown();
+ taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+ }
}
@@ -491,23 +491,6 @@ public class TaskContinuousTest {
}
@Override
- public Map<String, CheckpointableWatermark> getCommittableWatermark() {
- CheckpointableWatermark committable = lastWatermark.get();
- if (committable != null) {
- Map<String, CheckpointableWatermark> singletonMap = new HashMap<>();
- singletonMap.put(source.get(), committable);
- return singletonMap;
- } else {
- return Collections.EMPTY_MAP;
- }
- }
-
- @Override
- public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
- return Collections.EMPTY_MAP;
- }
-
- @Override
public void commit()
throws IOException {