You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/22 05:55:18 UTC
incubator-gobblin git commit: [GOBBLIN-177] Added error limit for
records failed during conversion in batch execution
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 784d7106f -> 4cab75d15
[GOBBLIN-177] Added error limit for records failed during conversion in batch execution
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [ ] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-177
### Description
- [ ] Here are some details about my PR, including
screenshots (if applicable):
Modified task.java to have a configurable error
limit for DataConversionExceptions
Also added a Integration test to verify the same
### Tests
- [ ] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
Added test TaskSkipErrRecordsIntegrationTest
### Commits
- [ ] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2065 from aditya1105/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4cab75d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4cab75d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4cab75d1
Branch: refs/heads/master
Commit: 4cab75d158b2db6b5aaac7b3c253283330ddde6d
Parents: 784d710
Author: aditya1105 <ad...@linkedin.com>
Authored: Mon Aug 21 22:55:11 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Aug 21 22:55:11 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/gobblin/runtime/Task.java | 17 +++++-
.../gobblin/runtime/TaskConfigurationKeys.java | 3 +
.../TaskSkipErrRecordsIntegrationTest.java | 63 ++++++++++++++++++++
.../org/apache/gobblin/TestAvroConverter.java | 33 ++++++++++
.../task_skip_err_records.properties | 42 +++++++++++++
5 files changed, 156 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 20b183f..69c9a1c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.BooleanUtils;
+import org.apache.gobblin.converter.DataConversionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -416,10 +417,22 @@ public class Task implements TaskIFace {
} else {
RecordEnvelope record;
// Extract, convert, and fork one source record at a time.
+ long errRecords = 0;
while ((record = extractor.readRecordEnvelope()) != null) {
onRecordExtract();
- for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) {
- processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null);
+ try {
+ for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) {
+ processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null);
+ }
+ } catch (Exception e) {
+ if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) {
+ throw new RuntimeException(e.getCause());
+ }
+ errRecords++;
+ if (errRecords > this.taskState.getPropAsLong(TaskConfigurationKeys.TASK_SKIP_ERROR_RECORDS,
+ TaskConfigurationKeys.DEFAULT_TASK_SKIP_ERROR_RECORDS)) {
+ throw new RuntimeException(e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
index 33e6fba..8325781 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
@@ -37,4 +37,7 @@ public class TaskConfigurationKeys {
public static final String TASK_IS_SINGLE_BRANCH_SYNCHRONOUS = "gobblin.task.is.single.branch.synchronous";
public static final String DEFAULT_TASK_IS_SINGLE_BRANCH_SYNCHRONOUS = Boolean.toString(false);
+
+ public static final String TASK_SKIP_ERROR_RECORDS = "task.skip.error.records";
+ public static final long DEFAULT_TASK_SKIP_ERROR_RECORDS = 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
new file mode 100644
index 0000000..11eafef
--- /dev/null
+++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
@@ -0,0 +1,63 @@
+package org.apache.gobblin;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+
+@Test
+public class TaskSkipErrRecordsIntegrationTest {
+ private static final String SAMPLE_FILE = "test.avro";
+ public static final String TASK_SKIP_ERROR_RECORDS = "task.skip.error.records";
+ public static final String ONE = "1";
+ public static final String ZERO = "0";
+
+ @BeforeTest
+ @AfterTest
+ public void cleanDir()
+ throws IOException {
+ GobblinLocalJobLauncherUtils.cleanDir();
+ }
+
+ /**
+ * Converter will throw DataConversionException while trying to convert
+ * first record. Since task.skip.error.records is set to 0, this job should fail.
+ */
+ @Test(expectedExceptions = JobException.class)
+ public void skipZeroErrorRecordTest()
+ throws Exception {
+ Properties jobProperties = getProperties();
+ jobProperties.setProperty(TASK_SKIP_ERROR_RECORDS, ZERO);
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ }
+
+ /**
+ * Converter will throw DataConversionException while trying to convert
+ * first record. Since task.skip.error.records is set to 1, this job should succeed
+ */
+ @Test
+ public void skipOneErrorRecordTest()
+ throws Exception {
+ Properties jobProperties = getProperties();
+ jobProperties.setProperty(TASK_SKIP_ERROR_RECORDS, ONE);
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ }
+
+ private Properties getProperties()
+ throws IOException {
+ Properties jobProperties =
+ GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/task_skip_err_records.properties");
+ FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + SAMPLE_FILE),
+ new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE));
+ jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+ GobblinLocalJobLauncherUtils.RESOURCE_DIR + GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE);
+ return jobProperties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java
new file mode 100644
index 0000000..d0eddba
--- /dev/null
+++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java
@@ -0,0 +1,33 @@
+package org.apache.gobblin;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+
+
+/**
+ * Test converter which throws DataConversionException while converting first record
+ */
+public class TestAvroConverter extends Converter<Schema, Schema, GenericRecord, GenericRecord> {
+ private long recordCount = 0;
+
+ @Override
+ public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ return inputSchema;
+ }
+
+ @Override
+ public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord, WorkUnitState workUnit)
+ throws DataConversionException {
+ recordCount++;
+ if (recordCount == 1) {
+ throw new DataConversionException("Unable to convert record");
+ }
+ return new SingleRecordIterable<GenericRecord>(inputRecord);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties b/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties
new file mode 100644
index 0000000..7e94ba8
--- /dev/null
+++ b/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+job.name=task_skip_err_records.job
+job.commit.policy=full
+
+job.lock.enabled=false
+
+extract.namespace=data.writerOutput
+extract.table.type=snapshot_append
+
+writer.destination.type=HDFS
+writer.eager.initialization=true
+writer.file.path=output
+writer.output.format=AVRO
+
+state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
+writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
+writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
+data.publisher.final.dir=./gobblin-test-harness/src/test/resources/runtime_test/final_dir
+
+
+source.class=org.apache.gobblin.TestAvroSource
+task.skip.error.records=1
+converter.classes=org.apache.gobblin.TestAvroConverter
+publish.data.at.job.level=true
+pubisher.class=org.apache.gobblin.publisher.BaseDataPublisher
+task.maxretries=0
\ No newline at end of file