You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/06/28 05:27:12 UTC
incubator-gobblin git commit: [GOBBLIN-520] Add
fileSetWorkUnitGenerator customization for CopySource
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 6936e0d19 -> 6ef8cddc7
[GOBBLIN-520] Add fileSetWorkUnitGenerator customization for CopySource
Closes #2390 from yukuai518/wuGenerator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6ef8cddc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6ef8cddc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6ef8cddc
Branch: refs/heads/master
Commit: 6ef8cddc7710585a3b222ad8b570e5f77b22f26f
Parents: 6936e0d
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Jun 27 22:26:30 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jun 27 22:26:38 2018 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 5 ++
.../data/management/copy/CopySource.java | 67 ++++++++++++--------
.../src/main/resources/stressTest.conf | 19 ++++++
.../apache/gobblin/util/PropertiesUtils.java | 2 +-
4 files changed, 64 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 50e6020..a56d4b7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -528,6 +528,11 @@ public class ConfigurationKeys {
public static final String SQL_SERVER_CONNECTION_PARAMETERS = "source.querybased.sqlserver.connectionParameters";
/**
+ * Configuration properties used by the CopySource.
+ */
+ public static final String COPY_SOURCE_FILESET_WU_GENERATOR_CLASS = "copy.source.fileset.wu.generator.class";
+
+ /**
* Configuration properties used by the FileBasedExtractor
*/
public static final String SOURCE_FILEBASED_DATA_DIRECTORY = "source.filebased.data.directory";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 3355f3d..b1ce7a6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -78,6 +79,7 @@ import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
@@ -86,6 +88,7 @@ import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
import org.apache.gobblin.util.deprecation.DeprecationUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.guid.Guid;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.request_allocation.GreedyAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
@@ -205,13 +208,19 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
//Submit alertable events for unfulfilled requests
submitUnfulfilledRequestEvents(allocator);
+ String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName());
Iterator<Callable<Void>> callableIterator =
Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() {
@Nullable
@Override
public Callable<Void> apply(FileSet<CopyEntity> input) {
- return new FileSetWorkUnitGenerator((CopyableDatasetBase) input.getDataset(), input, state, workUnitsMap,
- watermarkGenerator, minWorkUnitWeight);
+ try {
+ return GobblinConstructorUtils.<FileSetWorkUnitGenerator>invokeLongestConstructor(
+ new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias),
+ input.getDataset(), input, state, workUnitsMap, watermarkGenerator, minWorkUnitWeight, lineageInfo);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot create workunits generator", e);
+ }
}
});
@@ -315,15 +324,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
/**
* {@link Runnable} to generate copy listing for one {@link CopyableDataset}.
*/
+ @Alias("FileSetWorkUnitGenerator")
@AllArgsConstructor
- private class FileSetWorkUnitGenerator implements Callable<Void> {
+ public static class FileSetWorkUnitGenerator implements Callable<Void> {
- private final CopyableDatasetBase copyableDataset;
- private final FileSet<CopyEntity> fileSet;
- private final State state;
- private final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
- private final Optional<CopyableFileWatermarkGenerator> watermarkGenerator;
- private final long minWorkUnitWeight;
+ protected final CopyableDatasetBase copyableDataset;
+ protected final FileSet<CopyEntity> fileSet;
+ protected final State state;
+ protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
+ protected final Optional<CopyableFileWatermarkGenerator> watermarkGenerator;
+ protected final long minWorkUnitWeight;
+ protected final Optional<LineageInfo> lineageInfo;
@Override
public Void call() {
@@ -362,19 +373,31 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
ioe);
}
}
- }
- private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
- if (copyEntity instanceof CopyableFile) {
- CopyableFile copyableFile = (CopyableFile) copyEntity;
+ private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
+ CopyEntity copyEntity)
+ throws IOException {
+ if (copyEntity instanceof CopyableFile) {
+ Optional<WatermarkInterval> watermarkIntervalOptional =
+ CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator);
+ if (watermarkIntervalOptional.isPresent()) {
+ workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
+ }
+ }
+ }
+
+ private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
+ if (copyEntity instanceof CopyableFile) {
+ CopyableFile copyableFile = (CopyableFile) copyEntity;
/*
* In Gobblin Distcp, the source and target path info of a CopyableFile are determined by its dataset found by
* a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
* to be set by the same logic
*/
- if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
- && copyableFile.getDestinationDataset() != null) {
- lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
+ if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
+ && copyableFile.getDestinationDataset() != null) {
+ lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
+ }
}
}
}
@@ -505,16 +528,4 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
public static CopyableDatasetMetadata deserializeCopyableDataset(State state) {
return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
}
-
- private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
- CopyEntity copyEntity)
- throws IOException {
- if (copyEntity instanceof CopyableFile) {
- Optional<WatermarkInterval> watermarkIntervalOptional =
- CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator);
- if (watermarkIntervalOptional.isPresent()) {
- workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-example/src/main/resources/stressTest.conf
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/stressTest.conf b/gobblin-example/src/main/resources/stressTest.conf
new file mode 100644
index 0000000..594a080
--- /dev/null
+++ b/gobblin-example/src/main/resources/stressTest.conf
@@ -0,0 +1,19 @@
+job.name=stressTest1
+job.group=GobblinSamples
+
+stressTest.numWorkUnits=2
+stressTest.numRecords=20000
+stressTest.computeTimeMicro=900
+stressTest.sleepTimeMicro=2100
+
+source.class=org.apache.gobblin.util.test.StressTestingSource
+
+writer.builder.class=org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder
+writer.output.format=txt
+data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
+
+# Work paths
+state.store.enabled=false
+
+# Miscellaneous
+job.lock.enabled=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 4ab6db8..c41273c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -62,7 +62,7 @@ public class PropertiesUtils {
}
public static long getPropAsLong(Properties properties, String key, long defaultValue) {
- return Long.valueOf(properties.getProperty(key, Long.toString(defaultValue)));
+ return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue)));
}
/**