You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/29 18:50:01 UTC

[gobblin] branch master updated: [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource (#3436)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 374fd6f  [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource (#3436)
374fd6f is described below

commit 374fd6faf7b7268b4b70c53eb86c9c7bc9c49270
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Nov 29 10:49:53 2021 -0800

    [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource (#3436)
    
    * [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource
    
    * add unit test
    
    * address comments to make high/low watermark optional
    
    * Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)
    
    * Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s
    
    * Add missing file, `MysqlNonFlowSpecStoreTest`
    
    * Fixup `MysqlNonFlowSpecStoreTest`
    
    * Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.
    
    * Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.
    
    * Aid maintainers with additional code comments
    
    * [GOBBLIN-1557] Make KafkaSource getFilteredTopics method protected (#3408)
    
    The method was originally private, and it is useful to be able to
    override it in subclasses, to redefine how to get topics to be processed.
    
    Change-Id: If94cda2f7a5e65e52e2453427c60f4abb932b3f8
    
    * [GOBBLIN-1567] do not set a custom maxConnLifetime for sql connection (#3418)
    
    * do not set a custom maxConnLifetime for sql connection
    
    * address review comment
    
    * [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#3420)
    
    * Exponential backoff for Salesforce bulk api polling
    
    * Read min and max wait time from prop with default
    
    * set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)
    
    * [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)
    
    * Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage
    
    * Streamline `JobSpecDeserializer` error handling, on review feedback.
    
    * Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.
    
    * Fix javadoc slipup
    
    * Tag metrics with proxy url if available (#3423)
    
    * remove use of deprecated helix class (#3424)
    
    codestyle changes
    
    * [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container (#3419)
    
    * [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container
    
    * address comments
    
    * change the way we set low watermark to have a better indicate for the watermark range of the snapshot
    
    * address comments
    
    * fix test error
    
    * [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (#3403)
    
    * determine flow status based on the fact if dag manager is enabled
    this is needed because when dag manager is not enabled, flow level events are not emitted
    and cannot be used to determine flow status. in that case flow status has to be determined
    by using job statuses.
    
    store flow status in the FlowStatus
    
    * address review comments
    
    * address review comments
    
    * removed a commented line
    
    * [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc  and fix a sync… (#3415)
    
    * codestyle changes, typo corrections, improved javadoc  and fix a synchronization issue
    
    * address review comments
    
    * add review comments
    
    * address review comments
    
    * address review comments
    
    * fix bugsFixMain
    
    * do not delete data while dropping a hive table because data is deleted, if needed, separately (#3431)
    
    * [GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio… (#3426)
    
    * [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition column
    
    * fix to failing test case
    
    * Updated IncebergMetadataWriterTest to blacklist the test table from non-completeness tests
    
    * moved dataset name update in tablemetadata
    
    * Added newPartition checks in Table Metadata
    
    * Fixed test case to include new_parition_enabled
    
    Co-authored-by: Vikram Bohra <vb...@vbohra-mn1.linkedin.biz>
    
    * [GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy to a reasonable… (#3430)
    
    * change the multiplier used in ExponentialWaitStrategy to 1 second. old multiplier 2ms was retrying too fast for some use cases
    
    * .
    
    * [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)
    
    * [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type
    
    * address comments
    
    * [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists
    
    * [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (#3425)
    
    * [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container
    
    * address comments
    
    * address comments
    
    * [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline
    
    * [GOBBLIN-1576] skip appending record count to staging file if present… (#3429)
    
    * [GOBBLIN-1576] skip appending record count to staging file if present already
    
    * fixed checkstyle
    
    * fixed method
    
    Co-authored-by: Vikram Bohra <vb...@vbohra-mn1.linkedin.biz>
    
    * fix the NPE in dagManager
    
    * fix quota check issue in dagManager
    
    * address comments
    
    Co-authored-by: Kip Kohn <ck...@linkedin.com>
    Co-authored-by: Joseph Allemandou <jo...@gmail.com>
    Co-authored-by: Arjun Singh Bora <ab...@linkedin.com>
    Co-authored-by: Jiashuo Wang <wi...@umich.edu>
    Co-authored-by: William Lo <lo...@gmail.com>
    Co-authored-by: vbohra <vb...@linkedin.com>
    Co-authored-by: Vikram Bohra <vb...@vbohra-mn1.linkedin.biz>
---
 .../source/extractor/extract/QueryBasedSource.java | 11 ++++++++
 .../runtime/JobExecutionEventSubmitter.java        |  3 ++
 .../service/modules/orchestration/DagManager.java  |  7 +++--
 .../gobblin/util/test/StressTestingSource.java     |  7 +++++
 .../gobblin/util/test/TestStressTestingSource.java | 33 ++++++++++++++++++++++
 5 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index 5d5330d..d86f9ef 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.slf4j.MDC;
 
 import com.google.common.base.Optional;
@@ -233,6 +234,8 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
       extract.setFullTrue(System.currentTimeMillis());
     }
 
+    Optional<Long> highestWaterMark = Optional.absent();
+    Optional<Long> lowestWaterMark = Optional.absent();
     for (Partition partition : partitions) {
       WorkUnit workunit = WorkUnit.create(extract);
       workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
@@ -241,6 +244,14 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
       addLineageSourceInfo(state, sourceEntity, workunit);
       partition.serialize(workunit);
       workUnits.add(workunit);
+      highestWaterMark = highestWaterMark.isPresent() ?
+          highestWaterMark.transform(hw -> Math.max(hw, partition.getHighWatermark())) : Optional.of(partition.getHighWatermark());
+      lowestWaterMark = lowestWaterMark.isPresent() ?
+          lowestWaterMark.transform(lw -> Math.min(lw, partition.getLowWatermark())) : Optional.of(partition.getLowWatermark());
+    }
+    if(highestWaterMark.isPresent() && lowestWaterMark.isPresent()) {
+      state.appendToListProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, highestWaterMark.get()));
+      state.appendToListProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, lowestWaterMark.get()));
     }
 
     return workUnits;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java
index 3a3caad..9b6d18c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 
 import lombok.AllArgsConstructor;
+import org.apache.gobblin.metrics.event.TimingEvent;
 
 
 @AllArgsConstructor
@@ -77,6 +78,8 @@ public class JobExecutionEventSubmitter {
     jobMetadataBuilder.put(METADATA_JOB_COMPLETED_TASKS, Integer.toString(jobState.getCompletedTasks()));
     jobMetadataBuilder.put(METADATA_JOB_LAUNCHER_TYPE, jobState.getLauncherType().toString());
     jobMetadataBuilder.put(METADATA_JOB_TRACKING_URL, jobState.getTrackingURL().or(UNKNOWN_VALUE));
+    jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, ""));
+    jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, ""));
     jobMetadataBuilder.put(EventSubmitter.EVENT_TYPE, JOB_STATE);
 
     this.eventSubmitter.submit(JOB_STATE, jobMetadataBuilder.build());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 4b304e5..bfc1f7a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -969,7 +969,7 @@ public class DagManager extends AbstractIdleService {
 
       if (proxyUser != null) {
         proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
-        proxyUserCheck = proxyQuotaIncrement < 0;  // proxy user quota check failed
+        proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
         if (!proxyUserCheck) {
           requesterMessage.append(String.format(
               "Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n",
@@ -985,7 +985,7 @@ public class DagManager extends AbstractIdleService {
             .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
         for (String requester : uniqueRequesters) {
           int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
-          boolean thisRequesterCheck = userQuotaIncrement < 0;  // user quota check failed
+          boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
           usersQuotaIncrement.add(requester);
           requesterCheck = requesterCheck && thisRequesterCheck;
           if (!thisRequesterCheck) {
@@ -1112,6 +1112,9 @@ public class DagManager extends AbstractIdleService {
 
     private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
       Integer currentCount;
+      if (user == null) {
+        return;
+      }
       do {
         currentCount = quotaMap.get(user);
       } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
index 5d70219..3fd10d0 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
@@ -54,6 +54,8 @@ public class StressTestingSource implements Source<String, byte[]> {
   public static final int DEFAULT_NUM_RECORDS = 1;
   public static final String MEM_ALLOC_BYTES_KEY = CONFIG_NAMESPACE + "." + "memAllocBytes";
   public static final int DEFAULT_MEM_ALLOC_BYTES = 8;
+  public static final String THROW_EXCEPTION = CONFIG_NAMESPACE + "." + "throwException";
+  public static final boolean DEFAULT_THROW_EXCEPTION = false;
 
   private static final long INVALID_TIME = -1;
 
@@ -94,6 +96,7 @@ public class StressTestingSource implements Source<String, byte[]> {
     private final int numRecords;
     private final int memAllocBytes;
     private final Random random;
+    private final boolean throwException;
 
     public ExtractorImpl(WorkUnitState state) {
       this.random = new Random();
@@ -113,6 +116,7 @@ public class StressTestingSource implements Source<String, byte[]> {
       // num records only takes effect if the duration is not specified
       this.numRecords = this.endTime == INVALID_TIME ? state.getPropAsInt(NUM_RECORDS_KEY, DEFAULT_NUM_RECORDS) : 0;
       this.memAllocBytes = state.getPropAsInt(MEM_ALLOC_BYTES_KEY, DEFAULT_MEM_ALLOC_BYTES);
+      this.throwException = state.getPropAsBoolean(THROW_EXCEPTION, DEFAULT_THROW_EXCEPTION);
     }
 
     @Override
@@ -134,6 +138,9 @@ public class StressTestingSource implements Source<String, byte[]> {
       // If an end time is configured then it is used as the stopping point otherwise the record count limit is used
       if ((this.endTime != INVALID_TIME && System.currentTimeMillis() > this.endTime) ||
           (this.numRecords > 0 && this.recordsEmitted >= this.numRecords)) {
+        if (this.throwException) {
+          throw new IOException("This is one test exception");
+        }
         return null;
       }
 
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
index 08ce408..6577547 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
@@ -168,4 +168,37 @@ public class TestStressTestingSource {
     Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000),
         "Time spent " + timeSpentMicro);
   }
+
+  @Test
+  public void testThrowException() throws DataRecordException, IOException {
+    final int MEM_ALLOC_BYTES = 100;
+    final int NUM_WORK_UNITS = 1;
+    final int SLEEP_TIME_MICRO = 1000;
+    final int NUM_RECORDS = 30; // this config is ignored since the duration is set
+    final int RUN_DURATION_SECS = 5;
+
+    SourceState state = new SourceState();
+    state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+    state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+    state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+    state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+    state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS);
+    state.setProp(StressTestingSource.THROW_EXCEPTION, true);
+
+    StressTestingSource source = new StressTestingSource();
+
+    List<WorkUnit> wus = source.getWorkunits(state);
+    Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+    WorkUnit wu = wus.get(0);
+    WorkUnitState wuState = new WorkUnitState(wu, state);
+    Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+    Assert.expectThrows(IOException.class, () -> {
+      byte[] record;
+      while ((record = extractor.readRecord(null)) != null) {
+        Assert.assertEquals(record.length, 100);
+      }
+    });
+  }
 }