You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/19 17:29:41 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-726] enable
schema check for ticket ETL-8753,
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 620fd6d [GOBBLIN-726] enable schema check for ticket ETL-8753,
620fd6d is described below
commit 620fd6de6d0dd51f438326115fb1c0618865dc12
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Apr 19 10:29:35 2019 -0700
[GOBBLIN-726] enable schema check for ticket ETL-8753,
Closes #2593 from ZihanLi58/checkSchema
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../management/copy/SchemaCheckedCopySource.java | 40 +++++++++++++
.../extractor/FileAwareInputStreamExtractor.java | 43 +++++++------
...leAwareInputStreamExtractorWithCheckSchema.java | 70 ++++++++++++++++++++++
.../embedded/EmbeddedGobblinDistcpTest.java | 64 ++++++++++++++++++++
5 files changed, 199 insertions(+), 19 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index fc648be..8cdddb8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -539,6 +539,7 @@ public class ConfigurationKeys {
* Configuration properties used by the CopySource.
*/
public static final String COPY_SOURCE_FILESET_WU_GENERATOR_CLASS = "copy.source.fileset.wu.generator.class";
+ public static final String COPY_EXPECTED_SCHEMA = "gobblin.copy.expectedSchema";
/**
* Configuration properties used by the FileBasedExtractor
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/SchemaCheckedCopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/SchemaCheckedCopySource.java
new file mode 100644
index 0000000..0c197d7
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/SchemaCheckedCopySource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractorWithCheckSchema;
+import org.apache.gobblin.source.extractor.Extractor;
+
+
+/**
+ * Used instead of {@link CopySource} for {@link FileSystem}s that need to check the schema
+ * during the process of data deployment.
+ */
+public class SchemaCheckedCopySource extends CopySource {
+ @Override
+ protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf,
+ WorkUnitState state)
+ throws IOException {
+ return new FileAwareInputStreamExtractorWithCheckSchema(fs, cf, state);
+ }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
index 9863a98..6a5a34d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
@@ -48,11 +48,11 @@ import org.apache.gobblin.util.io.MeteredInputStream;
*/
public class FileAwareInputStreamExtractor implements Extractor<String, FileAwareInputStream> {
- private final FileSystem fs;
- private final CopyableFile file;
- private final WorkUnitState state;
+ protected final FileSystem fs;
+ protected final CopyableFile file;
+ protected final WorkUnitState state;
/** True indicates the unique record has already been read. */
- private boolean recordRead;
+ protected boolean recordRead;
public FileAwareInputStreamExtractor(FileSystem fs, CopyableFile file, WorkUnitState state) {
this.fs = fs;
@@ -83,24 +83,29 @@ public class FileAwareInputStreamExtractor implements Extractor<String, FileAwar
Configuration conf =
this.state == null ? HadoopUtils.newConfiguration() : HadoopUtils.getConfFromState(this.state);
FileSystem fsFromFile = this.file.getOrigin().getPath().getFileSystem(conf);
- this.recordRead = true;
- FileAwareInputStream.FileAwareInputStreamBuilder builder = FileAwareInputStream.builder().file(this.file);
- if (this.file.getFileStatus().isDirectory()) {
- return builder.inputStream(EmptyInputStream.instance).build();
- }
+ return buildStream(fsFromFile);
+ }
+ return null;
+ }
- FSDataInputStream dataInputStream = fsFromFile.open(this.file.getFileStatus().getPath());
- if (this.state != null && DistcpFileSplitter.isSplitWorkUnit(this.state)) {
- Optional<DistcpFileSplitter.Split> split = DistcpFileSplitter.getSplit(this.state);
- builder.split(split);
- if (split.isPresent()) {
- dataInputStream.seek(split.get().getLowPosition());
- }
+ protected FileAwareInputStream buildStream(FileSystem fsFromFile)
+ throws DataRecordException, IOException{
+ this.recordRead = true;
+ FileAwareInputStream.FileAwareInputStreamBuilder builder = FileAwareInputStream.builder().file(this.file);
+ if (this.file.getFileStatus().isDirectory()) {
+ return builder.inputStream(EmptyInputStream.instance).build();
+ }
+
+ FSDataInputStream dataInputStream = fsFromFile.open(this.file.getFileStatus().getPath());
+ if (this.state != null && DistcpFileSplitter.isSplitWorkUnit(this.state)) {
+ Optional<DistcpFileSplitter.Split> split = DistcpFileSplitter.getSplit(this.state);
+ builder.split(split);
+ if (split.isPresent()) {
+ dataInputStream.seek(split.get().getLowPosition());
}
- builder.inputStream(MeteredInputStream.builder().in(dataInputStream).build());
- return builder.build();
}
- return null;
+ builder.inputStream(MeteredInputStream.builder().in(dataInputStream).build());
+ return builder.build();
}
/**
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
new file mode 100644
index 0000000..34c4ab2
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.extractor;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.extractor.DataRecordException;
+
+/**
+ * Used instead of {@link FileAwareInputStreamExtractor} that extracts {@link InputStream}s. This extractor will first
+ * check if the schema matches the expected schema. If not it will abort the job.
+ */
+
+public class FileAwareInputStreamExtractorWithCheckSchema extends FileAwareInputStreamExtractor{
+
+ public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file, WorkUnitState state)
+ {
+ super(fs, file, state);
+ }
+ public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file)
+ {
+ this(fs, file, null);
+ }
+
+ @Override
+ protected FileAwareInputStream buildStream(FileSystem fsFromFile)
+ throws DataRecordException, IOException{
+ if(!schemaChecking(fsFromFile))
+ {
+ throw new DataRecordException("Schema does not match the expected schema");
+ }
+ return super.buildStream(fsFromFile);
+ }
+
+ protected boolean schemaChecking(FileSystem fsFromFile)
+ throws IOException {
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+ DataFileReader<GenericRecord> dataFileReader = new DataFileReader(new FsInput(this.file.getFileStatus().getPath(),fsFromFile), datumReader);
+ Schema schema = dataFileReader.getSchema();
+ return schema.toString().equals(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+ }
+}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 24b18a5..96d917b 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -20,14 +20,26 @@ package org.apache.gobblin.runtime.embedded;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
+import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractorWithCheckSchema;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -72,6 +84,58 @@ public class EmbeddedGobblinDistcpTest {
}
@Test
+ public void testCheckSchema() throws Exception {
+ Schema schema = null;
+ try (InputStream is = GobblinMetricsPinotFlattenerConverter.class.getClassLoader().getResourceAsStream("avroSchemaManagerTest/expectedSchema.avsc")) {
+ schema = new Schema.Parser().parse(is);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ String fileName = "file.avro";
+
+ File tmpSource = Files.createTempDir();
+ tmpSource.deleteOnExit();
+ File tmpTarget = Files.createTempDir();
+ tmpTarget.deleteOnExit();
+
+ File tmpFile = new File(tmpSource, fileName);
+ tmpFile.createNewFile();
+
+ GenericDatumWriter<GenericRecord> datumWriter = new
+ GenericDatumWriter<>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
+ dataFileWriter.create(schema, tmpFile);
+ for(int i = 0; i < 100; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("foo", i);
+ dataFileWriter.append(record);
+ }
+
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+ EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new Path(tmpSource.getAbsolutePath()),
+ new Path(tmpTarget.getAbsolutePath()));
+ embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+ embedded.setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, SchemaCheckedCopySource.class.getName());
+ //test when schema is not the expected one, the job will be aborted.
+ embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, "{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo1\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
+ JobExecutionResult result = embedded.run();
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+ //test when schema is the expected one, the job will succeed.
+ embedded.setConfiguration(ConfigurationKeys.COPY_EXPECTED_SCHEMA, "{\"type\":\"record\",\"name\":\"baseRecord\",\"fields\":[{\"name\":\"foo\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
+ result = embedded.run();
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertTrue(new File(tmpTarget, fileName).exists());
+
+
+ }
+
+ @Test
public void testWithVersionPreserve() throws Exception {
String fileName = "file";