You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/12 20:37:10 UTC

incubator-gobblin git commit: [GOBBLIN-583] Add support for writing the schema file using a staging file when there is an existing target file

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a6ec4a9af -> d3ba836f9


[GOBBLIN-583] Add support for writing the schema file using a staging file when there is an existing target file

Closes #2447 from htran1/schema_avsc_permissions


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d3ba836f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d3ba836f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d3ba836f

Branch: refs/heads/master
Commit: d3ba836f96600a07e975a00651a895043cc37a53
Parents: a6ec4a9
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Sep 12 13:37:04 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Sep 12 13:37:04 2018 -0700

----------------------------------------------------------------------
 .../gobblin/hive/avro/HiveAvroSerDeManager.java |  16 ++-
 .../hive/avro/HiveAvroSerDeManagerTest.java     | 143 +++++++++++++++++++
 .../resources/test-hive-table/hive-test.avsc    | Bin 0 -> 402 bytes
 .../java/org/apache/gobblin/util/AvroUtils.java |  52 ++++++-
 4 files changed, 205 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
index 7277c2e..6a8f1f6 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
@@ -56,6 +56,10 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
   public static final boolean DEFAULT_USE_SCHEMA_FILE = false;
   public static final String SCHEMA_FILE_NAME = "schema.file.name";
   public static final String DEFAULT_SCHEMA_FILE_NAME = "_schema.avsc";
+  public static final String SCHEMA_TEMP_FILE_NAME = "schema.temp.file.name";
+  public static final String DEFAULT_SCHEMA_TEMP_FILE_NAME = "_schema_temp.avsc";
+  public static final String USE_SCHEMA_TEMP_FILE = "use.schema.temp.file";
+  public static final boolean DEFAULT_USE_SCHEMA_TEMP_FILE = false;
   public static final String SCHEMA_LITERAL_LENGTH_LIMIT = "schema.literal.length.limit";
   public static final int DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT = 4000;
   public static final String HIVE_SPEC_SCHEMA_READING_TIMER = "hiveAvroSerdeManager.schemaReadTimer";
@@ -64,6 +68,8 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
   protected final FileSystem fs;
   protected final boolean useSchemaFile;
   protected final String schemaFileName;
+  protected final boolean useSchemaTempFile;
+  protected final String schemaTempFileName;
   protected final int schemaLiteralLengthLimit;
   protected final HiveSerDeWrapper serDeWrapper = HiveSerDeWrapper.get("AVRO");
 
@@ -79,7 +85,9 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
     }
 
     this.useSchemaFile = props.getPropAsBoolean(USE_SCHEMA_FILE, DEFAULT_USE_SCHEMA_FILE);
+    this.useSchemaTempFile = props.getPropAsBoolean(USE_SCHEMA_TEMP_FILE, DEFAULT_USE_SCHEMA_TEMP_FILE);
     this.schemaFileName = props.getProp(SCHEMA_FILE_NAME, DEFAULT_SCHEMA_FILE_NAME);
+    this.schemaTempFileName = props.getProp(SCHEMA_TEMP_FILE_NAME, DEFAULT_SCHEMA_TEMP_FILE_NAME);
     this.schemaLiteralLengthLimit =
         props.getPropAsInt(SCHEMA_LITERAL_LENGTH_LIMIT, DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT);
 
@@ -172,7 +180,13 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
     if (schemaStr.length() <= this.schemaLiteralLengthLimit) {
       hiveUnit.setSerDeProp(SCHEMA_LITERAL, schema.toString());
     } else {
-      AvroUtils.writeSchemaToFile(schema, schemaFile, this.fs, true);
+      Path schemaTempFile = null;
+
+      if (useSchemaTempFile) {
+        schemaTempFile = new Path(schemaFile.getParent(), this.schemaTempFileName);
+      }
+
+      AvroUtils.writeSchemaToFile(schema, schemaFile, schemaTempFile, this.fs, true);
       log.info("Using schema file " + schemaFile.toString());
       hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java
new file mode 100644
index 0000000..054ca40
--- /dev/null
+++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive.avro;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveTable;
+
+
+@Test(singleThreaded = true)
+public class HiveAvroSerDeManagerTest {
+  private static String TEST_DB = "testDB";
+  private static String TEST_TABLE = "testTable";
+  private Path testBasePath;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    this.testBasePath = new Path("testdir");
+    fs.delete(this.testBasePath, true);
+    fs.delete(this.testBasePath, true);
+
+    fs.mkdirs(this.testBasePath);
+
+    Files.copy(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avro"),
+        Paths.get(this.testBasePath.toString(), "hive-test.avro"), StandardCopyOption.REPLACE_EXISTING);
+  }
+
+  /**
+   * Test that the schema is written to the schema literal
+   */
+  @Test
+  public void testSchemaLiteral() throws IOException {
+    State state = new State();
+    HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    manager.addSerDeProperties(this.testBasePath, registrationUnit);
+
+    Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL).contains("example.avro"));
+  }
+
+  @Test
+  public void testSchemaUrl() throws IOException {
+    State state = new State();
+    state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+
+    validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, false);
+  }
+
+  @Test
+  public void testSchemaUrlWithExistingFile() throws IOException {
+    State state = new State();
+    state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+
+    validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, true);
+  }
+
+  @Test
+  public void testSchemaUrlWithTempFile() throws IOException {
+    final String SCHEMA_FILE_NAME = "test_temp.avsc";
+    State state = new State();
+    state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+    state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+    state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME);
+    state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+
+    validateSchemaUrl(state, SCHEMA_FILE_NAME, false);
+  }
+
+  @Test
+  public void testSchemaUrlWithTempFileAndExistingFile() throws IOException {
+    final String SCHEMA_FILE_NAME = "test_temp.avsc";
+    State state = new State();
+    state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
+    state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+    state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME);
+    state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
+
+    validateSchemaUrl(state, SCHEMA_FILE_NAME, true);
+  }
+
+  private void validateSchemaUrl(State state, String targetSchemaFileName, boolean createConflictingFile) throws IOException {
+    HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    // Clean up existing file
+    String targetPathStr = new Path(this.testBasePath, targetSchemaFileName).toString();
+    File targetFile = new File(targetPathStr);
+    targetFile.delete();
+
+    // create a conflicting file
+    if (createConflictingFile) {
+      targetFile.createNewFile();
+    }
+
+    manager.addSerDeProperties(this.testBasePath, registrationUnit);
+
+    Assert.assertNull(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL));
+    String schemaUrl = registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_URL);
+    Assert.assertEquals(schemaUrl, targetPathStr);
+    Assert.assertTrue(IOUtils.contentEquals(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avsc"),
+        new FileInputStream(schemaUrl)));
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    fs.delete(this.testBasePath, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc b/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc
new file mode 100755
index 0000000..80fc01e
Binary files /dev/null and b/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc differ

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d3ba836f/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 52de135..36a88bb 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -387,21 +387,63 @@ public class AvroUtils {
 
   public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite)
       throws IOException {
-    writeSchemaToFile(schema, filePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ));
+    writeSchemaToFile(schema, filePath, null, fs, overwrite);
+  }
+
+  public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite)
+      throws IOException {
+    writeSchemaToFile(schema, filePath, tempFilePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL,
+        FsAction.READ));
   }
 
   public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite, FsPermission perm)
     throws IOException {
+    writeSchemaToFile(schema, filePath, null, fs, overwrite, perm);
+  }
+
+  /**
+   * Write a schema to a file
+   * @param schema the schema
+   * @param filePath the target file
+   * @param tempFilePath if not null then this path is used for a temporary file used to stage the write
+   * @param fs a {@link FileSystem}
+   * @param overwrite should any existing target file be overwritten?
+   * @param perm permissions
+   * @throws IOException
+   */
+  public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite,
+      FsPermission perm)
+      throws IOException {
+    boolean fileExists = fs.exists(filePath);
+
     if (!overwrite) {
-      Preconditions.checkState(!fs.exists(filePath), filePath + " already exists");
+      Preconditions.checkState(!fileExists, filePath + " already exists");
     } else {
-      HadoopUtils.deletePath(fs, filePath, true);
+      // delete the target file now if not using a staging file
+      if (fileExists && null == tempFilePath) {
+        HadoopUtils.deletePath(fs, filePath, true);
+        // file has been removed
+        fileExists = false;
+      }
     }
 
-    try (DataOutputStream dos = fs.create(filePath)) {
+    // If the file exists then write to a temp file to make the replacement as close to atomic as possible
+    Path writeFilePath = fileExists ? tempFilePath : filePath;
+
+    try (DataOutputStream dos = fs.create(writeFilePath)) {
       dos.writeChars(schema.toString());
     }
-    fs.setPermission(filePath, perm);
+    fs.setPermission(writeFilePath, perm);
+
+    // Replace existing file with the staged file
+    if (fileExists) {
+      if (!fs.delete(filePath, true)) {
+        throw new IOException(
+            String.format("Failed to delete %s while renaming %s to %s", filePath, tempFilePath, filePath));
+      }
+
+      HadoopUtils.movePath(fs, tempFilePath, fs, filePath, true, fs.getConf());
+    }
   }
 
   /**