You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/19 17:29:41 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-726] enable schema check for ticket ETL-8753,

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 620fd6d  [GOBBLIN-726] enable schema check for ticket ETL-8753,
620fd6d is described below

commit 620fd6de6d0dd51f438326115fb1c0618865dc12
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Apr 19 10:29:35 2019 -0700

    [GOBBLIN-726] enable schema check for ticket ETL-8753,
    
    Closes #2593 from ZihanLi58/checkSchema
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../management/copy/SchemaCheckedCopySource.java   | 40 +++++++++++++
 .../extractor/FileAwareInputStreamExtractor.java   | 43 +++++++------
 ...leAwareInputStreamExtractorWithCheckSchema.java | 70 ++++++++++++++++++++++
 .../embedded/EmbeddedGobblinDistcpTest.java        | 64 ++++++++++++++++++++
 5 files changed, 199 insertions(+), 19 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index fc648be..8cdddb8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -539,6 +539,7 @@ public class ConfigurationKeys {
    * Configuration properties used by the CopySource.
    */
   public static final String COPY_SOURCE_FILESET_WU_GENERATOR_CLASS = "copy.source.fileset.wu.generator.class";
+  public static final String COPY_EXPECTED_SCHEMA = "gobblin.copy.expectedSchema";
 
   /**
    * Configuration properties used by the FileBasedExtractor
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/SchemaCheckedCopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/SchemaCheckedCopySource.java
new file mode 100644
index 0000000..0c197d7
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/SchemaCheckedCopySource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractorWithCheckSchema;
+import org.apache.gobblin.source.extractor.Extractor;
+
+
+/**
+ * Used instead of {@link CopySource} for {@link FileSystem}s that need to check the schema
+ * during the process of data deployment.
+ */
+public class SchemaCheckedCopySource extends CopySource {
+  @Override
+  protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf,
+      WorkUnitState state)
+      throws IOException {
+    return new FileAwareInputStreamExtractorWithCheckSchema(fs, cf, state);
+  }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
index 9863a98..6a5a34d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
@@ -48,11 +48,11 @@ import org.apache.gobblin.util.io.MeteredInputStream;
  */
 public class FileAwareInputStreamExtractor implements Extractor<String, FileAwareInputStream> {
 
-  private final FileSystem fs;
-  private final CopyableFile file;
-  private final WorkUnitState state;
+  protected final FileSystem fs;
+  protected final CopyableFile file;
+  protected final WorkUnitState state;
   /** True indicates the unique record has already been read. */
-  private boolean recordRead;
+  protected boolean recordRead;
 
   public FileAwareInputStreamExtractor(FileSystem fs, CopyableFile file, WorkUnitState state) {
     this.fs = fs;
@@ -83,24 +83,29 @@ public class FileAwareInputStreamExtractor implements Extractor<String, FileAwar
       Configuration conf =
           this.state == null ? HadoopUtils.newConfiguration() : HadoopUtils.getConfFromState(this.state);
       FileSystem fsFromFile = this.file.getOrigin().getPath().getFileSystem(conf);
-      this.recordRead = true;
-      FileAwareInputStream.FileAwareInputStreamBuilder builder = FileAwareInputStream.builder().file(this.file);
-      if (this.file.getFileStatus().isDirectory()) {
-        return builder.inputStream(EmptyInputStream.instance).build();
-      }
+      return buildStream(fsFromFile);
+    }
+    return null;
+  }
 
-      FSDataInputStream dataInputStream = fsFromFile.open(this.file.getFileStatus().getPath());
-      if (this.state != null && DistcpFileSplitter.isSplitWorkUnit(this.state)) {
-        Optional<DistcpFileSplitter.Split> split = DistcpFileSplitter.getSplit(this.state);
-        builder.split(split);
-        if (split.isPresent()) {
-          dataInputStream.seek(split.get().getLowPosition());
-        }
+  protected FileAwareInputStream buildStream(FileSystem fsFromFile)
+      throws DataRecordException, IOException{
+    this.recordRead = true;
+    FileAwareInputStream.FileAwareInputStreamBuilder builder = FileAwareInputStream.builder().file(this.file);
+    if (this.file.getFileStatus().isDirectory()) {
+      return builder.inputStream(EmptyInputStream.instance).build();
+    }
+
+    FSDataInputStream dataInputStream = fsFromFile.open(this.file.getFileStatus().getPath());
+    if (this.state != null && DistcpFileSplitter.isSplitWorkUnit(this.state)) {
+      Optional<DistcpFileSplitter.Split> split = DistcpFileSplitter.getSplit(this.state);
+      builder.split(split);
+      if (split.isPresent()) {
+        dataInputStream.seek(split.get().getLowPosition());
       }
-      builder.inputStream(MeteredInputStream.builder().in(dataInputStream).build());
-      return builder.build();
     }
-    return null;
+    builder.inputStream(MeteredInputStream.builder().in(dataInputStream).build());
+    return builder.build();
   }
 
   /**
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
new file mode 100644
index 0000000..34c4ab2
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.extractor;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.extractor.DataRecordException;
+
+/**
+ * Used instead of {@link FileAwareInputStreamExtractor} that extracts {@link InputStream}s. This extractor will first
+ * check if the schema matches the expected schema. If not it will abort the job.
+ */
+
+public class FileAwareInputStreamExtractorWithCheckSchema extends FileAwareInputStreamExtractor{
+
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file, WorkUnitState state)
+  {
+    super(fs, file, state);
+  }
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file)
+  {
+    this(fs, file, null);
+  }
+
+  @Override
+  protected FileAwareInputStream buildStream(FileSystem fsFromFile)
+      throws DataRecordException, IOException{
+    if(!schemaChecking(fsFromFile))
+    {
+      throw new DataRecordException("Schema does not match the expected schema");
+    }
+    return super.buildStream(fsFromFile);
+  }
+
+  protected boolean schemaChecking(FileSystem fsFromFile)
+      throws IOException {
+    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+    DataFileReader<GenericRecord> dataFileReader = new DataFileReader(new FsInput(this.file.getFileStatus().getPath(),fsFromFile), datumReader);
+    Schema schema = dataFileReader.getSchema();
+    return schema.toString().equals(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+  }
+}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 24b18a5..96d917b 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -20,14 +20,26 @@ package org.apache.gobblin.runtime.embedded;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
+import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractorWithCheckSchema;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -72,6 +84,58 @@ public class EmbeddedGobblinDistcpTest {
   }
 
   @Test
+  public void testCheckSchema() throws Exception {
+    Schema schema = null;
+    try (InputStream is = GobblinMetricsPinotFlattenerConverter.class.getClassLoader().getResourceAsStream("avroSchemaManagerTest/expectedSchema.avsc")) {
+      schema = new Schema.Parser().parse(is);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    String fileName = "file.avro";
+
+    File tmpSource = Files.createTempDir();
+    tmpSource.deleteOnExit();
+    File tmpTarget = Files.createTempDir();
+    tmpTarget.deleteOnExit();
+
+    File tmpFile = new File(tmpSource, fileName);
+    tmpFile.createNewFile();
+
+    GenericDatumWriter<GenericRecord> datumWriter = new
+        GenericDatumWriter<>(schema);
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
+    dataFileWriter.create(schema, tmpFile);
+    for(int i = 0; i < 100; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put("foo", i);
+      dataFileWriter.append(record);
+    }
+
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+    EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new Path(tmpSource.getAbsolutePath()),
+        new Path(tmpTarget.getAbsolutePath()));
+    embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+    embedded.setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, SchemaCheckedCopySource.class.getName());
+    //test when schema is not the expected one, the job will be aborted.
+    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, "{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo1\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
+    JobExecutionResult result = embedded.run();
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertFalse(result.isSuccessful());
+    Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+    //test when schema is the expected one, the job will succeed.
+    embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, "{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
+    result = embedded.run();
+    Assert.assertTrue(result.isSuccessful());
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertTrue(new File(tmpTarget, fileName).exists());
+
+
+  }
+
+  @Test
   public void testWithVersionPreserve() throws Exception {
     String fileName = "file";