You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:25 UTC

[07/50] incubator-gobblin git commit: [GOBBLIN-391] Use the DataPublisherFactory to allow sharing publisher…

[GOBBLIN-391] Use the DataPublisherFactory to allow sharing publisher…

Closes #2267 from htran1/share_publisher


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/378ccaa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/378ccaa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/378ccaa8

Branch: refs/heads/0.12.0
Commit: 378ccaa8a253a1eda873ffbe74300c6bf8a755e8
Parents: 41fd2b9
Author: Hung Tran <hu...@linkedin.com>
Authored: Fri Jan 26 11:45:31 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jan 26 11:45:31 2018 -0800

----------------------------------------------------------------------
 .../gobblin/publisher/DataPublisherFactory.java |  12 ++-
 .../gobblin/runtime/SafeDatasetCommit.java      |  27 +++--
 .../runtime/mapreduce/MRJobLauncherTest.java    | 103 +++++++++++++++++++
 3 files changed, 133 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
index 4e565ad..8d77fd6 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
@@ -54,6 +54,16 @@ public class DataPublisherFactory<S extends ScopeType<S>>
     }
   }
 
+  /**
+   * Is the publisher cacheable in the SharedResourcesBroker?
+   * @param publisher
+   * @return true if cacheable, else false
+   */
+  public static boolean isPublisherCacheable(DataPublisher publisher) {
+    // only threadsafe publishers are cacheable. non-threadsafe publishers are marked immediately for invalidation
+    return publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP);
+  }
+
   @Override
   public String getName() {
     return FACTORY_NAME;
@@ -75,7 +85,7 @@ public class DataPublisherFactory<S extends ScopeType<S>>
       // by the broker.
       // Otherwise, it is not shareable, so return it as an immediately invalidated resource that will only be returned
       // once from the broker.
-      if (publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)) {
+      if (isPublisherCacheable(publisher)) {
         return new ResourceInstance<>(publisher);
       } else {
         return new ImmediatelyInvalidResourceEntry<>(publisher);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 43e5c59..7e2e9fa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.DataPublisherFactory;
 import org.apache.gobblin.publisher.UnpublishedHandling;
 import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
 import org.apache.gobblin.runtime.task.TaskFactory;
@@ -133,9 +134,23 @@ final class SafeDatasetCommit implements Callable<Void> {
             }
             generateCommitSequenceBuilder(this.datasetState, entry.getValue());
           } else {
-            DataPublisher publisher = taskFactory == null ? closer
-                .register(DataPublisher.getInstance(dataPublisherClass, this.jobContext.getJobState()))
-                : taskFactory.createDataPublisher(this.datasetState);
+            DataPublisher publisher;
+
+            if (taskFactory == null) {
+              publisher = DataPublisherFactory.get(dataPublisherClass.getName(), this.jobContext.getJobState(),
+                  this.jobContext.getJobBroker());
+
+              // non-threadsafe publishers are not shareable and are not retained in the broker, so register them with
+              // the closer
+              if (!DataPublisherFactory.isPublisherCacheable(publisher)) {
+                closer.register(publisher);
+              }
+            } else {
+              // NOTE: sharing of publishers is not supported when they are instantiated through the TaskFactory.
+              // This should be revisited if sharing is required.
+              publisher = taskFactory.createDataPublisher(this.datasetState);
+            }
+
             if (this.isJobCancelled) {
               if (publisher.canBeSkipped()) {
                 log.warn(publisher.getClass() + " will be skipped.");
@@ -160,11 +175,7 @@ final class SafeDatasetCommit implements Callable<Void> {
           this.datasetState.setState(JobState.RunningState.COMMITTED);
         }
       }
-    } catch (ReflectiveOperationException roe) {
-      log.error(String.format("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn,
-          this.jobContext.getJobId()), roe);
-      throw new RuntimeException(roe);
-    } catch (Throwable throwable) {
+    }  catch (Throwable throwable) {
       log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn,
           this.jobContext.getJobId()), throwable);
       throw new RuntimeException(throwable);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 8d4f308..e8e996e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -20,7 +20,10 @@ package org.apache.gobblin.runtime.mapreduce;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
 import org.jboss.byteman.contrib.bmunit.BMNGRunner;
@@ -37,14 +40,17 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.capability.Capability;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.runtime.JobLauncherTestHelper;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.util.limiter.BaseLimiterType;
@@ -360,6 +366,55 @@ public class MRJobLauncherTest extends BMNGRunner {
     }
   }
 
+  @Test
+  public void testLaunchJobWithNonThreadsafeDataPublisher() throws Exception {
+    final Logger log = LoggerFactory.getLogger(getClass().getName() + ".testLaunchJobWithNonThreadsafeDataPublisher");
+    log.info("in");
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithNonThreadsafeDataPublisher");
+    jobProps.setProperty(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE, TestNonThreadsafeDataPublisher.class.getName());
+
+    // make sure the count starts from 0
+    TestNonThreadsafeDataPublisher.instantiatedCount.set(0);
+
+    try {
+      this.jobLauncherTestHelper.runTestWithMultipleDatasets(jobProps);
+    } finally {
+      this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+
+    // A different  publisher is used for each dataset
+    Assert.assertEquals(TestNonThreadsafeDataPublisher.instantiatedCount.get(), 4);
+
+    log.info("out");
+  }
+
+  @Test
+  public void testLaunchJobWithThreadsafeDataPublisher() throws Exception {
+    final Logger log = LoggerFactory.getLogger(getClass().getName() + ".testLaunchJobWithThreadsafeDataPublisher");
+    log.info("in");
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithThreadsafeDataPublisher");
+    jobProps.setProperty(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE, TestThreadsafeDataPublisher.class.getName());
+
+    // make sure the count starts from 0
+    TestThreadsafeDataPublisher.instantiatedCount.set(0);
+
+    try {
+      this.jobLauncherTestHelper.runTestWithMultipleDatasets(jobProps);
+    } finally {
+      this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+
+    // The same publisher is used for all the data sets
+    Assert.assertEquals(TestThreadsafeDataPublisher.instantiatedCount.get(), 1);
+
+    log.info("out");
+  }
+
+
   @AfterClass(alwaysRun = true)
   public void tearDown() throws IOException {
     if (testMetastoreDatabase != null) {
@@ -390,4 +445,52 @@ public class MRJobLauncherTest extends BMNGRunner {
           JobLauncherTestHelper.DYNAMIC_VALUE1));
     }
   }
+
+  public static class TestNonThreadsafeDataPublisher extends DataPublisher {
+    // for counting how many times the object is instantiated in the test case
+    static AtomicInteger instantiatedCount = new AtomicInteger(0);
+
+    public TestNonThreadsafeDataPublisher(State state) {
+      super(state);
+      instantiatedCount.incrementAndGet();
+    }
+
+    @Override
+    public void initialize() throws IOException {
+    }
+
+    @Override
+    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
+      for (WorkUnitState workUnitState : states) {
+        // Upon successfully committing the data to the final output directory, set states
+        // of successful tasks to COMMITTED. leaving states of unsuccessful ones unchanged.
+        // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy.
+        workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      }
+    }
+
+    @Override
+    public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public boolean supportsCapability(Capability c, Map<String, Object> properties) {
+      return c == DataPublisher.REUSABLE;
+    }
+  }
+
+  public static class TestThreadsafeDataPublisher extends TestNonThreadsafeDataPublisher {
+    public TestThreadsafeDataPublisher(State state) {
+      super(state);
+    }
+
+    @Override
+    public boolean supportsCapability(Capability c, Map<String, Object> properties) {
+      return (c == Capability.THREADSAFE || c == DataPublisher.REUSABLE);
+    }
+  }
 }