You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:53 UTC
[35/50] incubator-gobblin git commit: [GOBBLIN-413] Use same
compaction start time for time lookback check during compaction
[GOBBLIN-413] Use same compaction start time for time lookback check during compaction
Closes #2289 from yukuai518/compacttime
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a7a85e15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a7a85e15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a7a85e15
Branch: refs/heads/0.12.0
Commit: a7a85e150474b8911b0b92114781a30105b77822
Parents: a3189d7
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Feb 21 14:14:48 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 21 14:14:48 2018 -0800
----------------------------------------------------------------------
.../compaction/source/CompactionSource.java | 5 +-
.../verify/CompactionTimeRangeVerifier.java | 7 +-
.../gobblin/azkaban/AzkabanJobLauncher.java | 24 +++-
.../runtime/listeners/CompositeJobListener.java | 133 +++++++++++++++++++
.../listeners/EmailNotificationJobListener.java | 2 +
5 files changed, 166 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 4e8d3e0..f11378f 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
import java.io.IOException;
import java.net.URI;
@@ -94,6 +95,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
@Slf4j
public class CompactionSource implements WorkUnitStreamSource<String, String> {
+ public static final String COMPACTION_INIT_TIME = "compaction.init.time";
private CompactionSuite suite;
private Path tmpJobDir;
private FileSystem fs;
@@ -108,6 +110,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
public WorkUnitStream getWorkunitStream(SourceState state) {
try {
fs = getSourceFileSystem(state);
+ state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
suite = CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);
initRequestAllocator(state);
@@ -433,7 +436,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
}
}
- protected FileSystem getSourceFileSystem(State state)
+ public static FileSystem getSourceFileSystem(State state)
throws IOException {
Configuration conf = HadoopUtils.getConfFromState(state);
String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 85eca40..a267ab5 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
import lombok.AllArgsConstructor;
@@ -50,19 +51,19 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste
CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset);
DateTime folderTime = result.getTime();
DateTimeZone timeZone = DateTimeZone.forID(this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
- DateTime current = new DateTime(timeZone);
+ DateTime compactionStartTime = new DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), timeZone);
PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours()
.appendSuffix("h").toFormatter();
// get earliest time
String maxTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
- earliest = current.minus(maxTimeAgo);
+ earliest = compactionStartTime.minus(maxTimeAgo);
// get latest time
String minTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
- latest = current.minus(minTimeAgo);
+ latest = compactionStartTime.minus(minTimeAgo);
if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) {
log.debug("{} falls in the user defined time range", dataset.datasetRoot());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 45b2f40..2a7d311 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
+import org.apache.gobblin.runtime.listeners.CompositeJobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
@@ -98,6 +100,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class);
public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride";
+ public static final String GOBBLIN_CUSTOM_JOB_LISTENERS = "gobblin.custom.job.listeners";
public static final String TEMPLATE_KEY = "gobblin.template.uri";
private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
@@ -115,7 +118,8 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
private final Closer closer = Closer.create();
private final JobLauncher jobLauncher;
- private final JobListener jobListener = new EmailNotificationJobListener();
+ private final JobListener jobListener;
+
private final Properties props;
private final ApplicationLauncher applicationLauncher;
private final long ownAzkabanSla;
@@ -134,6 +138,9 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
this.props = new Properties();
this.props.putAll(props);
+ // initialize job listeners after properties has been initialized
+ this.jobListener = initJobListener();
+
// load dynamic configuration and add them to the job properties
Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
DynamicConfigGenerator dynamicConfigGenerator =
@@ -217,6 +224,21 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID()));
}
+ private JobListener initJobListener() {
+ CompositeJobListener compositeJobListener = new CompositeJobListener();
+ List<String> listeners = new State(props).getPropAsList(GOBBLIN_CUSTOM_JOB_LISTENERS, EmailNotificationJobListener.class.getSimpleName());
+ try {
+ for (String listenerAlias: listeners) {
+ ClassAliasResolver<JobListener> conditionClassAliasResolver = new ClassAliasResolver<>(JobListener.class);
+ compositeJobListener.addJobListener(conditionClassAliasResolver.resolveClass(listenerAlias).newInstance());
+ }
+ } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ return compositeJobListener;
+ }
+
@Override
public void run()
throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java
new file mode 100644
index 0000000..bfdc4c8
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.listeners;
+
+import java.util.List;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.runtime.JobContext;
+
+import com.google.common.collect.Lists;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+@AllArgsConstructor
+public class CompositeJobListener extends AbstractJobListener {
+ private List<JobListener> listeners = Lists.newArrayList();
+
+ public CompositeJobListener() {
+ }
+
+ public void addJobListener(JobListener listener) {
+ this.listeners.add(listener);
+ }
+
+
+ @Override
+ public void onJobPrepare(JobContext jobContext) throws Exception {
+ StringBuffer buf = new StringBuffer();
+ for (JobListener listener: listeners) {
+ try {
+ listener.onJobPrepare(jobContext);
+ } catch (Exception e) {
+ buf.append(listener.getClass().getName() + ":" + e.toString());
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ String exceptions = buf.toString();
+ if (!exceptions.isEmpty()) {
+ throw new RuntimeException(exceptions);
+ }
+ }
+
+ @Override
+ public void onJobStart(JobContext jobContext) throws Exception {
+ StringBuffer buf = new StringBuffer();
+ for (JobListener listener: listeners) {
+ try {
+ listener.onJobStart(jobContext);
+ } catch (Exception e) {
+ buf.append(listener.getClass().getName() + ":" + e.toString());
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ String exceptions = buf.toString();
+ if (!exceptions.isEmpty()) {
+ throw new RuntimeException(exceptions);
+ }
+ }
+
+ @Override
+ public void onJobCompletion(JobContext jobContext) throws Exception {
+ StringBuffer buf = new StringBuffer();
+ for (JobListener listener: listeners) {
+ try {
+ listener.onJobCompletion(jobContext);
+ } catch (Exception e) {
+ buf.append(listener.getClass().getName() + ":" + e.toString());
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ String exceptions = buf.toString();
+ if (!exceptions.isEmpty()) {
+ throw new RuntimeException(exceptions);
+ }
+ }
+
+ @Override
+ public void onJobCancellation(JobContext jobContext) throws Exception {
+ StringBuffer buf = new StringBuffer();
+ for (JobListener listener: listeners) {
+ try {
+ listener.onJobCancellation(jobContext);
+ } catch (Exception e) {
+ buf.append(listener.getClass().getName() + ":" + e.toString());
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ String exceptions = buf.toString();
+ if (!exceptions.isEmpty()) {
+ throw new RuntimeException(exceptions);
+ }
+ }
+
+ @Override
+ public void onJobFailure(JobContext jobContext) throws Exception {
+ StringBuffer buf = new StringBuffer();
+ for (JobListener listener: listeners) {
+ try {
+ listener.onJobFailure(jobContext);
+ } catch (Exception e) {
+ buf.append(listener.getClass().getName() + ":" + e.toString());
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ String exceptions = buf.toString();
+ if (!exceptions.isEmpty()) {
+ throw new RuntimeException(exceptions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
index 3106f4d..feb0d92 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.runtime.listeners;
import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.annotation.Alias;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@ import org.apache.gobblin.util.EmailUtils;
*
* @author Yinan Li
*/
+@Alias("EmailNotificationJobListener")
public class EmailNotificationJobListener extends AbstractJobListener {
private static final Logger LOGGER = LoggerFactory.getLogger(EmailNotificationJobListener.class);