You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:25 UTC
[07/50] incubator-gobblin git commit: [GOBBLIN-391] Use the DataPublisherFactory to allow sharing publisher…
[GOBBLIN-391] Use the DataPublisherFactory to allow sharing publisher…
Closes #2267 from htran1/share_publisher
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/378ccaa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/378ccaa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/378ccaa8
Branch: refs/heads/0.12.0
Commit: 378ccaa8a253a1eda873ffbe74300c6bf8a755e8
Parents: 41fd2b9
Author: Hung Tran <hu...@linkedin.com>
Authored: Fri Jan 26 11:45:31 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jan 26 11:45:31 2018 -0800
----------------------------------------------------------------------
.../gobblin/publisher/DataPublisherFactory.java | 12 ++-
.../gobblin/runtime/SafeDatasetCommit.java | 27 +++--
.../runtime/mapreduce/MRJobLauncherTest.java | 103 +++++++++++++++++++
3 files changed, 133 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
index 4e565ad..8d77fd6 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
@@ -54,6 +54,16 @@ public class DataPublisherFactory<S extends ScopeType<S>>
}
}
+ /**
+ * Is the publisher cacheable in the SharedResourcesBroker?
+ * @param publisher
+ * @return true if cacheable, else false
+ */
+ public static boolean isPublisherCacheable(DataPublisher publisher) {
+ // only threadsafe publishers are cacheable. non-threadsafe publishers are marked immediately for invalidation
+ return publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP);
+ }
+
@Override
public String getName() {
return FACTORY_NAME;
@@ -75,7 +85,7 @@ public class DataPublisherFactory<S extends ScopeType<S>>
// by the broker.
// Otherwise, it is not shareable, so return it as an immediately invalidated resource that will only be returned
// once from the broker.
- if (publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)) {
+ if (isPublisherCacheable(publisher)) {
return new ResourceInstance<>(publisher);
} else {
return new ImmediatelyInvalidResourceEntry<>(publisher);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 43e5c59..7e2e9fa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.publisher.CommitSequencePublisher;
import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.DataPublisherFactory;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
import org.apache.gobblin.runtime.task.TaskFactory;
@@ -133,9 +134,23 @@ final class SafeDatasetCommit implements Callable<Void> {
}
generateCommitSequenceBuilder(this.datasetState, entry.getValue());
} else {
- DataPublisher publisher = taskFactory == null ? closer
- .register(DataPublisher.getInstance(dataPublisherClass, this.jobContext.getJobState()))
- : taskFactory.createDataPublisher(this.datasetState);
+ DataPublisher publisher;
+
+ if (taskFactory == null) {
+ publisher = DataPublisherFactory.get(dataPublisherClass.getName(), this.jobContext.getJobState(),
+ this.jobContext.getJobBroker());
+
+ // non-threadsafe publishers are not shareable and are not retained in the broker, so register them with
+ // the closer
+ if (!DataPublisherFactory.isPublisherCacheable(publisher)) {
+ closer.register(publisher);
+ }
+ } else {
+ // NOTE: sharing of publishers is not supported when they are instantiated through the TaskFactory.
+ // This should be revisited if sharing is required.
+ publisher = taskFactory.createDataPublisher(this.datasetState);
+ }
+
if (this.isJobCancelled) {
if (publisher.canBeSkipped()) {
log.warn(publisher.getClass() + " will be skipped.");
@@ -160,11 +175,7 @@ final class SafeDatasetCommit implements Callable<Void> {
this.datasetState.setState(JobState.RunningState.COMMITTED);
}
}
- } catch (ReflectiveOperationException roe) {
- log.error(String.format("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn,
- this.jobContext.getJobId()), roe);
- throw new RuntimeException(roe);
- } catch (Throwable throwable) {
+ } catch (Throwable throwable) {
log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn,
this.jobContext.getJobId()), throwable);
throw new RuntimeException(throwable);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 8d4f308..e8e996e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -20,7 +20,10 @@ package org.apache.gobblin.runtime.mapreduce;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collection;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.jboss.byteman.contrib.bmunit.BMNGRunner;
@@ -37,14 +40,17 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.capability.Capability;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.runtime.JobLauncherTestHelper;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.util.limiter.BaseLimiterType;
@@ -360,6 +366,55 @@ public class MRJobLauncherTest extends BMNGRunner {
}
}
+ @Test
+ public void testLaunchJobWithNonThreadsafeDataPublisher() throws Exception {
+ final Logger log = LoggerFactory.getLogger(getClass().getName() + ".testLaunchJobWithNonThreadsafeDataPublisher");
+ log.info("in");
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithNonThreadsafeDataPublisher");
+ jobProps.setProperty(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE, TestNonThreadsafeDataPublisher.class.getName());
+
+ // make sure the count starts from 0
+ TestNonThreadsafeDataPublisher.instantiatedCount.set(0);
+
+ try {
+ this.jobLauncherTestHelper.runTestWithMultipleDatasets(jobProps);
+ } finally {
+ this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+
+ // A different publisher is used for each dataset
+ Assert.assertEquals(TestNonThreadsafeDataPublisher.instantiatedCount.get(), 4);
+
+ log.info("out");
+ }
+
+ @Test
+ public void testLaunchJobWithThreadsafeDataPublisher() throws Exception {
+ final Logger log = LoggerFactory.getLogger(getClass().getName() + ".testLaunchJobWithThreadsafeDataPublisher");
+ log.info("in");
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithThreadsafeDataPublisher");
+ jobProps.setProperty(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE, TestThreadsafeDataPublisher.class.getName());
+
+ // make sure the count starts from 0
+ TestThreadsafeDataPublisher.instantiatedCount.set(0);
+
+ try {
+ this.jobLauncherTestHelper.runTestWithMultipleDatasets(jobProps);
+ } finally {
+ this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+
+ // The same publisher is used for all the data sets
+ Assert.assertEquals(TestThreadsafeDataPublisher.instantiatedCount.get(), 1);
+
+ log.info("out");
+ }
+
+
@AfterClass(alwaysRun = true)
public void tearDown() throws IOException {
if (testMetastoreDatabase != null) {
@@ -390,4 +445,52 @@ public class MRJobLauncherTest extends BMNGRunner {
JobLauncherTestHelper.DYNAMIC_VALUE1));
}
}
+
+ public static class TestNonThreadsafeDataPublisher extends DataPublisher {
+ // for counting how many times the object is instantiated in the test case
+ static AtomicInteger instantiatedCount = new AtomicInteger(0);
+
+ public TestNonThreadsafeDataPublisher(State state) {
+ super(state);
+ instantiatedCount.incrementAndGet();
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ }
+
+ @Override
+ public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
+ for (WorkUnitState workUnitState : states) {
+ // Upon successfully committing the data to the final output directory, set states
+ // of successful tasks to COMMITTED. leaving states of unsuccessful ones unchanged.
+ // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy.
+ workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ }
+ }
+
+ @Override
+ public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public boolean supportsCapability(Capability c, Map<String, Object> properties) {
+ return c == DataPublisher.REUSABLE;
+ }
+ }
+
+ public static class TestThreadsafeDataPublisher extends TestNonThreadsafeDataPublisher {
+ public TestThreadsafeDataPublisher(State state) {
+ super(state);
+ }
+
+ @Override
+ public boolean supportsCapability(Capability c, Map<String, Object> properties) {
+ return (c == Capability.THREADSAFE || c == DataPublisher.REUSABLE);
+ }
+ }
}