You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/12 20:37:10 UTC
incubator-gobblin git commit: [GOBBLIN-583] Add support for writing
the schema file using a staging file when there is an existing target file
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a6ec4a9af -> d3ba836f9
[GOBBLIN-583] Add support for writing the schema file using a staging file when there is an existing target file
Closes #2447 from htran1/schema_avsc_permissions
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d3ba836f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d3ba836f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d3ba836f
Branch: refs/heads/master
Commit: d3ba836f96600a07e975a00651a895043cc37a53
Parents: a6ec4a9
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Sep 12 13:37:04 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Sep 12 13:37:04 2018 -0700
----------------------------------------------------------------------
.../gobblin/hive/avro/HiveAvroSerDeManager.java | 16 ++-
.../hive/avro/HiveAvroSerDeManagerTest.java | 143 +++++++++++++++++++
.../resources/test-hive-table/hive-test.avsc | Bin 0 -> 402 bytes
.../java/org/apache/gobblin/util/AvroUtils.java | 52 ++++++-
4 files changed, 205 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
index 7277c2e..6a8f1f6 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
@@ -56,6 +56,10 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
public static final boolean DEFAULT_USE_SCHEMA_FILE = false;
public static final String SCHEMA_FILE_NAME = "schema.file.name";
public static final String DEFAULT_SCHEMA_FILE_NAME = "_schema.avsc";
+ public static final String SCHEMA_TEMP_FILE_NAME = "schema.temp.file.name";
+ public static final String DEFAULT_SCHEMA_TEMP_FILE_NAME = "_schema_temp.avsc";
+ public static final String USE_SCHEMA_TEMP_FILE = "use.schema.temp.file";
+ public static final boolean DEFAULT_USE_SCHEMA_TEMP_FILE = false;
public static final String SCHEMA_LITERAL_LENGTH_LIMIT = "schema.literal.length.limit";
public static final int DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT = 4000;
public static final String HIVE_SPEC_SCHEMA_READING_TIMER = "hiveAvroSerdeManager.schemaReadTimer";
@@ -64,6 +68,8 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
protected final FileSystem fs;
protected final boolean useSchemaFile;
protected final String schemaFileName;
+ protected final boolean useSchemaTempFile;
+ protected final String schemaTempFileName;
protected final int schemaLiteralLengthLimit;
protected final HiveSerDeWrapper serDeWrapper = HiveSerDeWrapper.get("AVRO");
@@ -79,7 +85,9 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
}
this.useSchemaFile = props.getPropAsBoolean(USE_SCHEMA_FILE, DEFAULT_USE_SCHEMA_FILE);
+ this.useSchemaTempFile = props.getPropAsBoolean(USE_SCHEMA_TEMP_FILE, DEFAULT_USE_SCHEMA_TEMP_FILE);
this.schemaFileName = props.getProp(SCHEMA_FILE_NAME, DEFAULT_SCHEMA_FILE_NAME);
+ this.schemaTempFileName = props.getProp(SCHEMA_TEMP_FILE_NAME, DEFAULT_SCHEMA_TEMP_FILE_NAME);
this.schemaLiteralLengthLimit =
props.getPropAsInt(SCHEMA_LITERAL_LENGTH_LIMIT, DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT);
@@ -172,7 +180,13 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
if (schemaStr.length() <= this.schemaLiteralLengthLimit) {
hiveUnit.setSerDeProp(SCHEMA_LITERAL, schema.toString());
} else {
- AvroUtils.writeSchemaToFile(schema, schemaFile, this.fs, true);
+ Path schemaTempFile = null;
+
+ if (useSchemaTempFile) {
+ schemaTempFile = new Path(schemaFile.getParent(), this.schemaTempFileName);
+ }
+
+ AvroUtils.writeSchemaToFile(schema, schemaFile, schemaTempFile, this.fs, true);
log.info("Using schema file " + schemaFile.toString());
hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java
new file mode 100644
index 0000000..054ca40
--- /dev/null
+++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive.avro;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveTable;
+
+
+@Test(singleThreaded = true)
+public class HiveAvroSerDeManagerTest {
+ private static String TEST_DB = "testDB";
+ private static String TEST_TABLE = "testTable";
+ private Path testBasePath;
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ this.testBasePath = new Path("testdir");
+ fs.delete(this.testBasePath, true);
+ fs.delete(this.testBasePath, true);
+
+ fs.mkdirs(this.testBasePath);
+
+ Files.copy(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avro"),
+ Paths.get(this.testBasePath.toString(), "hive-test.avro"), StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ /**
+ * Test that the schema is written to the schema literal
+ */
+ @Test
+ public void testSchemaLiteral() throws IOException {
+ State state = new State();
+ HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ manager.addSerDeProperties(this.testBasePath, registrationUnit);
+
+ Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL).contains("example.avro"));
+ }
+
+ @Test
+ public void testSchemaUrl() throws IOException {
+ State state = new State();
+ state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+
+ validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, false);
+ }
+
+ @Test
+ public void testSchemaUrlWithExistingFile() throws IOException {
+ State state = new State();
+ state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+
+ validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, true);
+ }
+
+ @Test
+ public void testSchemaUrlWithTempFile() throws IOException {
+ final String SCHEMA_FILE_NAME = "test_temp.avsc";
+ State state = new State();
+ state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+ state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+ state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME);
+ state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+
+ validateSchemaUrl(state, SCHEMA_FILE_NAME, false);
+ }
+
+ @Test
+ public void testSchemaUrlWithTempFileAndExistingFile() throws IOException {
+ final String SCHEMA_FILE_NAME = "test_temp.avsc";
+ State state = new State();
+ state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+ state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+ state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME);
+ state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+
+ validateSchemaUrl(state, SCHEMA_FILE_NAME, true);
+ }
+
+ private void validateSchemaUrl(State state, String targetSchemaFileName, boolean createConflictingFile) throws IOException {
+ HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ // Clean up existing file
+ String targetPathStr = new Path(this.testBasePath, targetSchemaFileName).toString();
+ File targetFile = new File(targetPathStr);
+ targetFile.delete();
+
+ // create a conflicting file
+ if (createConflictingFile) {
+ targetFile.createNewFile();
+ }
+
+ manager.addSerDeProperties(this.testBasePath, registrationUnit);
+
+ Assert.assertNull(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL));
+ String schemaUrl = registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_URL);
+ Assert.assertEquals(schemaUrl, targetPathStr);
+ Assert.assertTrue(IOUtils.contentEquals(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avsc"),
+ new FileInputStream(schemaUrl)));
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ fs.delete(this.testBasePath, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc b/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc
new file mode 100755
index 0000000..80fc01e
Binary files /dev/null and b/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc differ
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 52de135..36a88bb 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -387,21 +387,63 @@ public class AvroUtils {
public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite)
throws IOException {
- writeSchemaToFile(schema, filePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ));
+ writeSchemaToFile(schema, filePath, null, fs, overwrite);
+ }
+
+ public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite)
+ throws IOException {
+ writeSchemaToFile(schema, filePath, tempFilePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL,
+ FsAction.READ));
}
public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite, FsPermission perm)
throws IOException {
+ writeSchemaToFile(schema, filePath, null, fs, overwrite, perm);
+ }
+
+ /**
+ * Write a schema to a file
+ * @param schema the schema
+ * @param filePath the target file
+ * @param tempFilePath if not null then this path is used for a temporary file used to stage the write
+ * @param fs a {@link FileSystem}
+ * @param overwrite should any existing target file be overwritten?
+ * @param perm permissions
+ * @throws IOException
+ */
+ public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite,
+ FsPermission perm)
+ throws IOException {
+ boolean fileExists = fs.exists(filePath);
+
if (!overwrite) {
- Preconditions.checkState(!fs.exists(filePath), filePath + " already exists");
+ Preconditions.checkState(!fileExists, filePath + " already exists");
} else {
- HadoopUtils.deletePath(fs, filePath, true);
+ // delete the target file now if not using a staging file
+ if (fileExists && null == tempFilePath) {
+ HadoopUtils.deletePath(fs, filePath, true);
+ // file has been removed
+ fileExists = false;
+ }
}
- try (DataOutputStream dos = fs.create(filePath)) {
+ // If the file exists then write to a temp file to make the replacement as close to atomic as possible
+ Path writeFilePath = fileExists ? tempFilePath : filePath;
+
+ try (DataOutputStream dos = fs.create(writeFilePath)) {
dos.writeChars(schema.toString());
}
- fs.setPermission(filePath, perm);
+ fs.setPermission(writeFilePath, perm);
+
+ // Replace existing file with the staged file
+ if (fileExists) {
+ if (!fs.delete(filePath, true)) {
+ throw new IOException(
+ String.format("Failed to delete %s while renaming %s to %s", filePath, tempFilePath, filePath));
+ }
+
+ HadoopUtils.movePath(fs, tempFilePath, fs, filePath, true, fs.getConf());
+ }
}
/**