You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/06/28 05:27:12 UTC

incubator-gobblin git commit: [GOBBLIN-520] Add fileSetWorkUnitGenerator customization for CopySource

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6936e0d19 -> 6ef8cddc7


[GOBBLIN-520] Add fileSetWorkUnitGenerator customization for CopySource

Closes #2390 from yukuai518/wuGenerator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6ef8cddc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6ef8cddc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6ef8cddc

Branch: refs/heads/master
Commit: 6ef8cddc7710585a3b222ad8b570e5f77b22f26f
Parents: 6936e0d
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Jun 27 22:26:30 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jun 27 22:26:38 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  5 ++
 .../data/management/copy/CopySource.java        | 67 ++++++++++++--------
 .../src/main/resources/stressTest.conf          | 19 ++++++
 .../apache/gobblin/util/PropertiesUtils.java    |  2 +-
 4 files changed, 64 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 50e6020..a56d4b7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -528,6 +528,11 @@ public class ConfigurationKeys {
   public static final String SQL_SERVER_CONNECTION_PARAMETERS = "source.querybased.sqlserver.connectionParameters";
 
   /**
+   * Configuration properties used by the CopySource.
+   */
+  public static final String COPY_SOURCE_FILESET_WU_GENERATOR_CLASS = "copy.source.fileset.wu.generator.class";
+
+  /**
    * Configuration properties used by the FileBasedExtractor
    */
   public static final String SOURCE_FILEBASED_DATA_DIRECTORY = "source.filebased.data.directory";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 3355f3d..b1ce7a6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -78,6 +79,7 @@ import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.WriterUtils;
@@ -86,6 +88,7 @@ import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
 import org.apache.gobblin.util.deprecation.DeprecationUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 import org.apache.gobblin.util.guid.Guid;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.util.request_allocation.GreedyAllocator;
 import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
 import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
@@ -205,13 +208,19 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
       //Submit alertable events for unfulfilled requests
       submitUnfulfilledRequestEvents(allocator);
 
+      String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName());
       Iterator<Callable<Void>> callableIterator =
           Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() {
             @Nullable
             @Override
             public Callable<Void> apply(FileSet<CopyEntity> input) {
-              return new FileSetWorkUnitGenerator((CopyableDatasetBase) input.getDataset(), input, state, workUnitsMap,
-                  watermarkGenerator, minWorkUnitWeight);
+              try {
+                return GobblinConstructorUtils.<FileSetWorkUnitGenerator>invokeLongestConstructor(
+                    new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias),
+                    input.getDataset(), input, state, workUnitsMap, watermarkGenerator, minWorkUnitWeight, lineageInfo);
+              } catch (Exception e) {
+                throw new RuntimeException("Cannot create workunits generator", e);
+              }
             }
           });
 
@@ -315,15 +324,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
   /**
    * {@link Runnable} to generate copy listing for one {@link CopyableDataset}.
    */
+  @Alias("FileSetWorkUnitGenerator")
   @AllArgsConstructor
-  private class FileSetWorkUnitGenerator implements Callable<Void> {
+  public static class FileSetWorkUnitGenerator implements Callable<Void> {
 
-    private final CopyableDatasetBase copyableDataset;
-    private final FileSet<CopyEntity> fileSet;
-    private final State state;
-    private final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
-    private final Optional<CopyableFileWatermarkGenerator> watermarkGenerator;
-    private final long minWorkUnitWeight;
+    protected final CopyableDatasetBase copyableDataset;
+    protected final FileSet<CopyEntity> fileSet;
+    protected final State state;
+    protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
+    protected final Optional<CopyableFileWatermarkGenerator> watermarkGenerator;
+    protected final long minWorkUnitWeight;
+    protected final Optional<LineageInfo> lineageInfo;
 
     @Override
     public Void call() {
@@ -362,19 +373,31 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
             ioe);
       }
     }
-  }
 
-  private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
-    if (copyEntity instanceof CopyableFile) {
-      CopyableFile copyableFile = (CopyableFile) copyEntity;
+    private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
+        CopyEntity copyEntity)
+        throws IOException {
+      if (copyEntity instanceof CopyableFile) {
+        Optional<WatermarkInterval> watermarkIntervalOptional =
+            CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator);
+        if (watermarkIntervalOptional.isPresent()) {
+          workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
+        }
+      }
+    }
+
+    private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
+      if (copyEntity instanceof CopyableFile) {
+        CopyableFile copyableFile = (CopyableFile) copyEntity;
       /*
        * In Gobblin Distcp, the source and target path info of a CopyableFile are determined by its dataset found by
        * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
        * to be set by the same logic
        */
-      if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
-          && copyableFile.getDestinationDataset() != null) {
-        lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
+        if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
+            && copyableFile.getDestinationDataset() != null) {
+          lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
+        }
       }
     }
   }
@@ -505,16 +528,4 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
   public static CopyableDatasetMetadata deserializeCopyableDataset(State state) {
     return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
   }
-
-  private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
-      CopyEntity copyEntity)
-      throws IOException {
-    if (copyEntity instanceof CopyableFile) {
-      Optional<WatermarkInterval> watermarkIntervalOptional =
-          CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator);
-      if (watermarkIntervalOptional.isPresent()) {
-        workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-example/src/main/resources/stressTest.conf
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/stressTest.conf b/gobblin-example/src/main/resources/stressTest.conf
new file mode 100644
index 0000000..594a080
--- /dev/null
+++ b/gobblin-example/src/main/resources/stressTest.conf
@@ -0,0 +1,19 @@
+job.name=stressTest1
+job.group=GobblinSamples
+
+stressTest.numWorkUnits=2
+stressTest.numRecords=20000
+stressTest.computeTimeMicro=900
+stressTest.sleepTimeMicro=2100
+
+source.class=org.apache.gobblin.util.test.StressTestingSource
+
+writer.builder.class=org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder
+writer.output.format=txt
+data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
+
+# Work paths
+state.store.enabled=false
+
+# Miscellaneous
+job.lock.enabled=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 4ab6db8..c41273c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -62,7 +62,7 @@ public class PropertiesUtils {
   }
 
   public static long getPropAsLong(Properties properties, String key, long defaultValue) {
-    return Long.valueOf(properties.getProperty(key, Long.toString(defaultValue)));
+    return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue)));
   }
 
   /**