You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/22 05:55:18 UTC

incubator-gobblin git commit: [GOBBLIN-177] Added error limit for records failed during conversion in batch execution

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 784d7106f -> 4cab75d15


[GOBBLIN-177] Added error limit for records failed during conversion in batch execution

Dear Gobblin maintainers,

Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!

### JIRA
- [ ] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
    -
https://issues.apache.org/jira/browse/GOBBLIN-177

### Description
- [ ] Here are some details about my PR, including
screenshots (if applicable):
Modified task.java to have a configurable error
limit for DataConversionExceptions
Also added a Integration test to verify the same

### Tests
- [ ] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
Added test TaskSkipErrRecordsIntegrationTest

### Commits
- [ ] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Closes #2065 from aditya1105/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4cab75d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4cab75d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4cab75d1

Branch: refs/heads/master
Commit: 4cab75d158b2db6b5aaac7b3c253283330ddde6d
Parents: 784d710
Author: aditya1105 <ad...@linkedin.com>
Authored: Mon Aug 21 22:55:11 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Aug 21 22:55:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/gobblin/runtime/Task.java   | 17 +++++-
 .../gobblin/runtime/TaskConfigurationKeys.java  |  3 +
 .../TaskSkipErrRecordsIntegrationTest.java      | 63 ++++++++++++++++++++
 .../org/apache/gobblin/TestAvroConverter.java   | 33 ++++++++++
 .../task_skip_err_records.properties            | 42 +++++++++++++
 5 files changed, 156 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 20b183f..69c9a1c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.BooleanUtils;
+import org.apache.gobblin.converter.DataConversionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -416,10 +417,22 @@ public class Task implements TaskIFace {
     } else {
       RecordEnvelope record;
       // Extract, convert, and fork one source record at a time.
+      long errRecords = 0;
       while ((record = extractor.readRecordEnvelope()) != null) {
         onRecordExtract();
-        for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) {
-          processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null);
+        try {
+          for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) {
+            processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null);
+          }
+        } catch (Exception e) {
+          if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) {
+            throw new RuntimeException(e.getCause());
+          }
+          errRecords++;
+          if (errRecords > this.taskState.getPropAsLong(TaskConfigurationKeys.TASK_SKIP_ERROR_RECORDS,
+              TaskConfigurationKeys.DEFAULT_TASK_SKIP_ERROR_RECORDS)) {
+            throw new RuntimeException(e);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
index 33e6fba..8325781 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java
@@ -37,4 +37,7 @@ public class TaskConfigurationKeys {
   public static final String TASK_IS_SINGLE_BRANCH_SYNCHRONOUS = "gobblin.task.is.single.branch.synchronous";
   public static final String DEFAULT_TASK_IS_SINGLE_BRANCH_SYNCHRONOUS = Boolean.toString(false);
 
+
+  public static final String TASK_SKIP_ERROR_RECORDS = "task.skip.error.records";
+  public static final long DEFAULT_TASK_SKIP_ERROR_RECORDS = 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
new file mode 100644
index 0000000..11eafef
--- /dev/null
+++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
@@ -0,0 +1,63 @@
+package org.apache.gobblin;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+
+@Test
+public class TaskSkipErrRecordsIntegrationTest {
+  private static final String SAMPLE_FILE = "test.avro";
+  public static final String TASK_SKIP_ERROR_RECORDS = "task.skip.error.records";
+  public static final String ONE = "1";
+  public static final String ZERO = "0";
+
+  @BeforeTest
+  @AfterTest
+  public void cleanDir()
+      throws IOException {
+    GobblinLocalJobLauncherUtils.cleanDir();
+  }
+
+  /**
+   * Converter will throw DataConversionException while trying to convert
+   * first record. Since task.skip.error.records is set to 0, this job should fail.
+   */
+  @Test(expectedExceptions = JobException.class)
+  public void skipZeroErrorRecordTest()
+      throws Exception {
+    Properties jobProperties = getProperties();
+    jobProperties.setProperty(TASK_SKIP_ERROR_RECORDS, ZERO);
+    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+  }
+
+  /**
+   * Converter will throw DataConversionException while trying to convert
+   * first record. Since task.skip.error.records is set to 1, this job should succeed
+   */
+  @Test
+  public void skipOneErrorRecordTest()
+      throws Exception {
+    Properties jobProperties = getProperties();
+    jobProperties.setProperty(TASK_SKIP_ERROR_RECORDS, ONE);
+    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+  }
+
+  private Properties getProperties()
+      throws IOException {
+    Properties jobProperties =
+        GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/task_skip_err_records.properties");
+    FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + SAMPLE_FILE),
+        new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE));
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+        GobblinLocalJobLauncherUtils.RESOURCE_DIR + GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE);
+    return jobProperties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java
new file mode 100644
index 0000000..d0eddba
--- /dev/null
+++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java
@@ -0,0 +1,33 @@
+package org.apache.gobblin;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+
+
+/**
+ * Test converter which throws DataConversionException while converting first record
+ */
+public class TestAvroConverter extends Converter<Schema, Schema, GenericRecord, GenericRecord> {
+  private long recordCount = 0;
+
+  @Override
+  public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    return inputSchema;
+  }
+
+  @Override
+  public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord, WorkUnitState workUnit)
+      throws DataConversionException {
+    recordCount++;
+    if (recordCount == 1) {
+      throw new DataConversionException("Unable to convert record");
+    }
+    return new SingleRecordIterable<GenericRecord>(inputRecord);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties b/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties
new file mode 100644
index 0000000..7e94ba8
--- /dev/null
+++ b/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+job.name=task_skip_err_records.job
+job.commit.policy=full
+
+job.lock.enabled=false
+
+extract.namespace=data.writerOutput
+extract.table.type=snapshot_append
+
+writer.destination.type=HDFS
+writer.eager.initialization=true
+writer.file.path=output
+writer.output.format=AVRO
+
+state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
+writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
+writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
+data.publisher.final.dir=./gobblin-test-harness/src/test/resources/runtime_test/final_dir
+
+
+source.class=org.apache.gobblin.TestAvroSource
+task.skip.error.records=1
+converter.classes=org.apache.gobblin.TestAvroConverter
+publish.data.at.job.level=true
+pubisher.class=org.apache.gobblin.publisher.BaseDataPublisher
+task.maxretries=0
\ No newline at end of file