You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/16 23:22:40 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-731] Make deserialization of FlowSpec more robust

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b373031  [GOBBLIN-731] Make deserialization of FlowSpec more robust
b373031 is described below

commit b37303182212d96d42b336869ffe5b0f002c6595
Author: autumnust <le...@linkedin.com>
AuthorDate: Tue Apr 16 16:20:26 2019 -0700

    [GOBBLIN-731] Make deserialization of FlowSpec more robust
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [ ] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/731
    ) issues and references them in the PR title.
    
    ### Description
    
    Catching exception when loading `FlowSpec` from
    FileSystem and make sure that even some of spec
    files are problematic, the rest of them will still
    be successfully deserialized.
    
    Adding unit test for that.
    
    Removing some unnecessary modifiers while reading
    through the code.
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Make deserializatin of spec more robust
    
    Address comments on adding serDeException in test
    
    Remove mistakenly brought imports
    
    Closes #2598 from autumnust/flowSpecSerDeRobust
---
 .../instrumented/StandardMetricsBridge.java        |  2 +-
 .../gobblin/runtime/api/MutableSpecCatalog.java    |  2 +-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 17 ++--
 .../gobblin/runtime/spec_store/FSSpecStore.java    | 18 +++-
 .../gobblin/spec_catalog/FlowCatalogTest.java      | 96 ++++++++++++++++++----
 5 files changed, 108 insertions(+), 27 deletions(-)

diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index b371fe1..5e5f4f9 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -40,7 +40,7 @@ public interface StandardMetricsBridge {
     return ImmutableList.of();
   }
 
-  public class StandardMetrics implements MetricSet {
+  class StandardMetrics implements MetricSet {
     protected final List<ContextAwareMetric> contextAwareMetrics;
     protected final Map<String, Metric> rawMetrics;
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 2ddbf7e..b64fb17 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -57,7 +57,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
   void remove(URI uri, Properties headers) throws SpecNotFoundException;
 
   @Slf4j
-  public static class MutableStandardMetrics extends StandardMetrics {
+  class MutableStandardMetrics extends StandardMetrics {
     public static final String TIME_FOR_SPEC_CATALOG_REMOVE = "timeForSpecCatalogRemove";
     public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut";
     @Getter private final ContextAwareTimer timeForSpecCatalogPut;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index a7db4fd..ddf8cbe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -205,24 +205,25 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   /* Catalog core functionality                     *
   /**************************************************/
 
+  /**
+   * Get all specs from {@link SpecStore}
+   */
   @Override
   public Collection<Spec> getSpecs() {
     try {
       return specStore.getSpecs();
+      // TODO: Have kind of metrics keeping track of specs that failed to be deserialized.
+
     } catch (IOException e) {
       throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
     }
   }
 
   public Collection<Spec> getSpecsWithTimeUpdate() {
-    try {
-      long startTime = System.currentTimeMillis();
-      Collection<Spec> specs = specStore.getSpecs();
-      this.metrics.updateGetSpecTime(startTime);
-      return specs;
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
-    }
+    long startTime = System.currentTimeMillis();
+    Collection<Spec> specs = this.getSpecs();
+    this.metrics.updateGetSpecTime(startTime);
+    return specs;
   }
 
   public boolean exists(URI uri) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index d87d6d4..0e2c482 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -238,13 +238,27 @@ public class FSSpecStore implements SpecStore {
     return specs;
   }
 
-  private void getSpecs(Path directory, Collection<Spec> specs) throws IOException {
+  /**
+   * For multiple {@link FlowSpec}s to be loaded, catch Exceptions when one of them failed to be loaded and
+   * continue with the rest.
+   *
+   * The {@link IOException} thrown from standard FileSystem call will be propagated, while the file-specific
+   * exception will be caught to ensure other files being able to deserialized.
+   *
+   * @param directory The directory that contains specs to be deserialized
+   * @param specs Container of specs.
+   */
+  private void getSpecs(Path directory, Collection<Spec> specs) throws Exception {
     FileStatus[] fileStatuses = fs.listStatus(directory);
     for (FileStatus fileStatus : fileStatuses) {
       if (fileStatus.isDirectory()) {
         getSpecs(fileStatus.getPath(), specs);
       } else {
-        specs.add(readSpecFromFile(fileStatus.getPath()));
+        try {
+          specs.add(readSpecFromFile(fileStatus.getPath()));
+        } catch (Exception e) {
+          log.warn(String.format("Path[%s] cannot be correctly deserialized as Spec", fileStatus.getPath()), e);
+        }
       }
     }
   }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index 537cdbe..2d73290 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -17,15 +17,37 @@
 
 package org.apache.gobblin.spec_catalog;
 
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.typesafe.config.Config;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -33,20 +55,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
 
 public class FlowCatalogTest {
   private static final Logger logger = LoggerFactory.getLogger(FlowCatalog.class);
@@ -117,6 +125,64 @@ public class FlowCatalogTest {
     }
   }
 
+  /**
+   * Make sure that when there's on spec failed to be deserialized, the rest of spec in specStore can
+   * still be taken care of.
+   */
+  @Test
+  public void testGetSpecRobustness() throws Exception {
+
+    File specDir = Files.createTempDir();
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+    SpecSerDe serde = Mockito.mock(SpecSerDe.class);
+    TestFsSpecStore fsSpecStore = new TestFsSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
+
+    // Version is specified as 0,1,2
+    File specFileFail = new File(specDir, "spec_fail");
+    Assert.assertTrue(specFileFail.createNewFile());
+    File specFile1 = new File(specDir, "spec0");
+    Assert.assertTrue(specFile1.createNewFile());
+    File specFile2 = new File(specDir, "spec1");
+    Assert.assertTrue(specFile2.createNewFile());
+    File specFile3 = new File(specDir, "serDeFail");
+    Assert.assertTrue(specFile3.createNewFile());
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Assert.assertEquals(fs.getFileStatus(new Path(specFile3.getAbsolutePath())).getLen(), 0);
+
+    Collection<Spec> specList = fsSpecStore.getSpecs();
+    // The fail and serDe datasets wouldn't survive
+    Assert.assertEquals(specList.size(), 2);
+    for (Spec spec: specList) {
+      Assert.assertTrue(!spec.getDescription().contains("spec_fail"));
+      Assert.assertTrue(!spec.getDescription().contains("serDeFail"));
+    }
+  }
+
+  class TestFsSpecStore extends FSSpecStore {
+    public TestFsSpecStore(Config sysConfig, SpecSerDe specSerDe) throws IOException {
+      super(sysConfig, specSerDe);
+    }
+
+    @Override
+    protected Spec readSpecFromFile(Path path) throws IOException {
+      if (path.getName().contains("fail")) {
+        throw new IOException("Mean to fail in the test");
+      } else if (path.getName().contains("serDeFail")) {
+
+        // Simulate the way that a serDe exception
+        FSDataInputStream fis = fs.open(path);
+        SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
+
+        // This line should never be reached since we generate SerDe Exception on purpose.
+        Assert.assertTrue(false);
+        return null;
+      }
+      else return initFlowSpec();
+    }
+  }
+
   @Test
   public void createFlowSpec() {
     // List Current Specs