You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/11/23 06:22:55 UTC

[GitHub] [gobblin] phet commented on a change in pull request #3436: [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource

phet commented on a change in pull request #3436:
URL: https://github.com/apache/gobblin/pull/3436#discussion_r754827725



##########
File path: gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
##########
@@ -168,4 +168,37 @@ public void testRunDuration() throws DataRecordException, IOException {
     Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000),
         "Time spent " + timeSpentMicro);
   }
+
+  @Test
+  public void testThrowException() throws DataRecordException, IOException {
+    final int MEM_ALLOC_BYTES = 100;
+    final int NUM_WORK_UNITS = 1;
+    final int SLEEP_TIME_MICRO = 1000;
+    final int NUM_RECORDS = 30; // this config is ignored since the duration is set
+    final int RUN_DURATION_SECS = 5;
+
+    SourceState state = new SourceState();
+    state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+    state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+    state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+    state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+    state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS);
+    state.setProp(StressTestingSource.THROW_EXCEPTION, true);
+
+    StressTestingSource source = new StressTestingSource();
+
+    List<WorkUnit> wus = source.getWorkunits(state);
+    Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+    WorkUnit wu = wus.get(0);
+    WorkUnitState wuState = new WorkUnitState(wu, state);
+    Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+    Assert.expectThrows(IOException.class, () -> {
+      byte[] record;
+      while ((record = extractor.readRecord(null)) != null) {
+        Assert.assertEquals(record.length, 100);

Review comment:
       minor, but for clarity I might replace with: `Assert.fail("should have thrown!")`

##########
File path: gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
##########
@@ -241,7 +244,13 @@ public int hashCode() {
       addLineageSourceInfo(state, sourceEntity, workunit);
       partition.serialize(workunit);
       workUnits.add(workunit);
+      highestWaterMark = Math.max(highestWaterMark, partition.getHighWatermark());
+      lowestWaterMark = Math.min(lowestWaterMark, partition.getLowWatermark());
     }
+    state.appendToListProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
+        String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, highestWaterMark));
+    state.appendToListProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD,
+        String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, lowestWaterMark));

Review comment:
       could `partitions` ever be empty?  if so, I recommend lowest, highest to be `Optional` and only write conditionally, rather than setting the properties to `MAX_VALUE` and `-1`.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java
##########
@@ -77,6 +78,8 @@ private void submitJobStateEvent(JobState jobState) {
     jobMetadataBuilder.put(METADATA_JOB_COMPLETED_TASKS, Integer.toString(jobState.getCompletedTasks()));
     jobMetadataBuilder.put(METADATA_JOB_LAUNCHER_TYPE, jobState.getLauncherType().toString());
     jobMetadataBuilder.put(METADATA_JOB_TRACKING_URL, jobState.getTrackingURL().or(UNKNOWN_VALUE));
+    jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, ""));
+    jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, ""));

Review comment:
       may not be an issue... just curious: when do we use `""` and when instead `UNKNOWN_VALUE`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org