You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:53 UTC

[35/50] incubator-gobblin git commit: [GOBBLIN-413] Use same compaction start time for time lookback check during compaction

[GOBBLIN-413] Use same compaction start time for time lookback check during compaction

Closes #2289 from yukuai518/compacttime


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a7a85e15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a7a85e15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a7a85e15

Branch: refs/heads/0.12.0
Commit: a7a85e150474b8911b0b92114781a30105b77822
Parents: a3189d7
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Feb 21 14:14:48 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 21 14:14:48 2018 -0800

----------------------------------------------------------------------
 .../compaction/source/CompactionSource.java     |   5 +-
 .../verify/CompactionTimeRangeVerifier.java     |   7 +-
 .../gobblin/azkaban/AzkabanJobLauncher.java     |  24 +++-
 .../runtime/listeners/CompositeJobListener.java | 133 +++++++++++++++++++
 .../listeners/EmailNotificationJobListener.java |   2 +
 5 files changed, 166 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 4e8d3e0..f11378f 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
 
 import java.io.IOException;
 import java.net.URI;
@@ -94,6 +95,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @Slf4j
 public class CompactionSource implements WorkUnitStreamSource<String, String> {
+  public static final String COMPACTION_INIT_TIME = "compaction.init.time";
   private CompactionSuite suite;
   private Path tmpJobDir;
   private FileSystem fs;
@@ -108,6 +110,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
   public WorkUnitStream getWorkunitStream(SourceState state) {
     try {
       fs = getSourceFileSystem(state);
+      state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
       suite = CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);
 
       initRequestAllocator(state);
@@ -433,7 +436,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
     }
   }
 
-  protected FileSystem getSourceFileSystem(State state)
+  public static FileSystem getSourceFileSystem(State state)
           throws IOException {
     Configuration conf = HadoopUtils.getConfFromState(state);
     String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 85eca40..a267ab5 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import lombok.AllArgsConstructor;
@@ -50,19 +51,19 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste
       CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset);
       DateTime folderTime = result.getTime();
       DateTimeZone timeZone = DateTimeZone.forID(this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
-      DateTime current = new DateTime(timeZone);
+      DateTime compactionStartTime = new DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), timeZone);
       PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours()
               .appendSuffix("h").toFormatter();
 
       // get earliest time
       String maxTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
       Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
-      earliest = current.minus(maxTimeAgo);
+      earliest = compactionStartTime.minus(maxTimeAgo);
 
       // get latest time
       String minTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
       Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
-      latest = current.minus(minTimeAgo);
+      latest = compactionStartTime.minus(minTimeAgo);
 
       if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) {
         log.debug("{} falls in the user defined time range", dataset.datasetRoot());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 45b2f40..2a7d311 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
+import org.apache.gobblin.runtime.listeners.CompositeJobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
@@ -98,6 +100,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
   private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class);
 
   public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride";
+  public static final String GOBBLIN_CUSTOM_JOB_LISTENERS = "gobblin.custom.job.listeners";
   public static final String TEMPLATE_KEY = "gobblin.template.uri";
 
   private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
@@ -115,7 +118,8 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
 
   private final Closer closer = Closer.create();
   private final JobLauncher jobLauncher;
-  private final JobListener jobListener = new EmailNotificationJobListener();
+  private final JobListener jobListener;
+
   private final Properties props;
   private final ApplicationLauncher applicationLauncher;
   private final long ownAzkabanSla;
@@ -134,6 +138,9 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
     this.props = new Properties();
     this.props.putAll(props);
 
+    // initialize job listeners after properties has been initialized
+    this.jobListener = initJobListener();
+
     // load dynamic configuration and add them to the job properties
     Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
     DynamicConfigGenerator dynamicConfigGenerator =
@@ -217,6 +224,21 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
         this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID()));
   }
 
+  private JobListener initJobListener() {
+    CompositeJobListener compositeJobListener = new CompositeJobListener();
+    List<String> listeners = new State(props).getPropAsList(GOBBLIN_CUSTOM_JOB_LISTENERS, EmailNotificationJobListener.class.getSimpleName());
+    try {
+      for (String listenerAlias: listeners) {
+        ClassAliasResolver<JobListener> conditionClassAliasResolver = new ClassAliasResolver<>(JobListener.class);
+        compositeJobListener.addJobListener(conditionClassAliasResolver.resolveClass(listenerAlias).newInstance());
+      }
+    } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+
+    return compositeJobListener;
+  }
+
   @Override
   public void run()
       throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java
new file mode 100644
index 0000000..bfdc4c8
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.listeners;
+
+import java.util.List;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.runtime.JobContext;
+
+import com.google.common.collect.Lists;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+@AllArgsConstructor
+public class CompositeJobListener extends AbstractJobListener {
+  private List<JobListener> listeners = Lists.newArrayList();
+
+  public CompositeJobListener() {
+  }
+
+  public void addJobListener(JobListener listener) {
+    this.listeners.add(listener);
+  }
+
+
+  @Override
+  public void onJobPrepare(JobContext jobContext) throws Exception {
+    StringBuffer buf = new StringBuffer();
+    for (JobListener listener: listeners) {
+      try {
+        listener.onJobPrepare(jobContext);
+      } catch (Exception e) {
+        buf.append(listener.getClass().getName() + ":" + e.toString());
+        log.error(ExceptionUtils.getFullStackTrace(e));
+      }
+    }
+
+    String exceptions = buf.toString();
+    if (!exceptions.isEmpty()) {
+      throw new RuntimeException(exceptions);
+    }
+  }
+
+  @Override
+  public void onJobStart(JobContext jobContext) throws Exception {
+    StringBuffer buf = new StringBuffer();
+    for (JobListener listener: listeners) {
+      try {
+        listener.onJobStart(jobContext);
+      } catch (Exception e) {
+        buf.append(listener.getClass().getName() + ":" + e.toString());
+        log.error(ExceptionUtils.getFullStackTrace(e));
+      }
+    }
+
+    String exceptions = buf.toString();
+    if (!exceptions.isEmpty()) {
+      throw new RuntimeException(exceptions);
+    }
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext) throws Exception {
+    StringBuffer buf = new StringBuffer();
+    for (JobListener listener: listeners) {
+      try {
+        listener.onJobCompletion(jobContext);
+      } catch (Exception e) {
+        buf.append(listener.getClass().getName() + ":" + e.toString());
+        log.error(ExceptionUtils.getFullStackTrace(e));
+      }
+    }
+
+    String exceptions = buf.toString();
+    if (!exceptions.isEmpty()) {
+      throw new RuntimeException(exceptions);
+    }
+  }
+
+  @Override
+  public void onJobCancellation(JobContext jobContext) throws Exception {
+    StringBuffer buf = new StringBuffer();
+    for (JobListener listener: listeners) {
+      try {
+        listener.onJobCancellation(jobContext);
+      } catch (Exception e) {
+        buf.append(listener.getClass().getName() + ":" + e.toString());
+        log.error(ExceptionUtils.getFullStackTrace(e));
+      }
+    }
+
+    String exceptions = buf.toString();
+    if (!exceptions.isEmpty()) {
+      throw new RuntimeException(exceptions);
+    }
+  }
+
+  @Override
+  public void onJobFailure(JobContext jobContext) throws Exception {
+    StringBuffer buf = new StringBuffer();
+    for (JobListener listener: listeners) {
+      try {
+        listener.onJobFailure(jobContext);
+      } catch (Exception e) {
+        buf.append(listener.getClass().getName() + ":" + e.toString());
+        log.error(ExceptionUtils.getFullStackTrace(e));
+      }
+    }
+
+    String exceptions = buf.toString();
+    if (!exceptions.isEmpty()) {
+      throw new RuntimeException(exceptions);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
index 3106f4d..feb0d92 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.runtime.listeners;
 
 import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.annotation.Alias;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +33,7 @@ import org.apache.gobblin.util.EmailUtils;
  *
  * @author Yinan Li
  */
+@Alias("EmailNotificationJobListener")
 public class EmailNotificationJobListener extends AbstractJobListener {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(EmailNotificationJobListener.class);