You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/25 19:31:01 UTC

incubator-gobblin git commit: [GOBBLIN-671] Close the underlying writer when a HiveWritableHdfsData…

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 55a19bbfe -> 60d4e61d0


[GOBBLIN-671] Close the underlying writer when a HiveWritableHdfsData…

Closes #2541 from htran1/hive_writer_close


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/60d4e61d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/60d4e61d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/60d4e61d

Branch: refs/heads/master
Commit: 60d4e61d09664e4ea2af511a3a53f6cd81d1963a
Parents: 55a19bb
Author: Hung Tran <hu...@linkedin.com>
Authored: Fri Jan 25 11:30:55 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jan 25 11:30:55 2019 -0800

----------------------------------------------------------------------
 .../org/apache/gobblin/writer/FsDataWriter.java | 29 ++++---
 .../writer/HiveWritableHdfsDataWriter.java      | 20 ++++-
 .../writer/HiveWritableHdfsDataWriterTest.java  | 89 ++++++++++++++++++++
 .../resources/writer/hive_writer.properties     | 21 +++++
 4 files changed, 144 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index d814622..dadb51f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -261,6 +261,21 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
     // For the same reason as deleting the staging file if it already exists, overwrite
     // the output file if it already exists to prevent task retry from being blocked.
     HadoopUtils.renamePath(this.fileContext, this.stagingFile, this.outputFile, true);
+
+    // The staging file is moved to the output path in commit, so rename to add record count after that
+    if (this.shouldIncludeRecordCountInFileName) {
+      String filePathWithRecordCount = addRecordCountToFileName();
+      this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount);
+    } else {
+      this.properties.appendToSetProp(this.allOutputFilesPropName, getOutputFilePath());
+    }
+
+    FsWriterMetrics metrics = new FsWriterMetrics(
+        this.id,
+        new PartitionIdentifier(this.partitionKey, this.branchId),
+        ImmutableSet.of(new FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten()))
+    );
+    this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson());
  }
 
   /**
@@ -285,20 +300,6 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
   public void close()
       throws IOException {
     this.closer.close();
-
-    if (this.shouldIncludeRecordCountInFileName) {
-      String filePathWithRecordCount = addRecordCountToFileName();
-      this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount);
-    } else {
-      this.properties.appendToSetProp(this.allOutputFilesPropName, getOutputFilePath());
-    }
-
-    FsWriterMetrics metrics = new FsWriterMetrics(
-        this.id,
-        new PartitionIdentifier(this.partitionKey, this.branchId),
-        ImmutableSet.of(new FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten()))
-    );
-    this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson());
   }
 
   private synchronized String addRecordCountToFileName()

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index b1a3046..1145920 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -44,6 +44,8 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
 
   protected final RecordWriter writer;
   protected final AtomicLong count = new AtomicLong(0);
+  // the close method may be invoked multiple times, but the underlying writer only supports close being called once
+  private boolean closed = false;
 
   public HiveWritableHdfsDataWriter(HiveWritableHdfsDataWriterBuilder<?> builder, State properties) throws IOException {
     super(builder, properties);
@@ -92,8 +94,24 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
   }
 
   @Override
+  public void close() throws IOException {
+    // close the underlying writer if not already closed. The close can only be called once for the underlying writer,
+    // so remember the state
+    if (!this.closed) {
+      this.writer.close(false);
+      this.closed = true;
+    }
+
+    super.close();
+  }
+
+  @Override
   public void commit() throws IOException {
-    this.writer.close(false);
+    if (!this.closed) {
+      this.writer.close(false);
+      this.closed = true;
+    }
+
     super.commit();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java
new file mode 100644
index 0000000..5fb5ee4
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+public class HiveWritableHdfsDataWriterTest {
+  private FileSystem fs;
+  private File tmpDir;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    tmpDir = Files.createTempDir();
+    this.fs = FileSystem.get(new Configuration());
+  }
+
+  @AfterClass
+  public void cleanUp() throws IOException {
+    if (this.fs.exists(new Path(this.tmpDir.getAbsolutePath()))) {
+      if (!this.fs.delete(new Path(this.tmpDir.getAbsolutePath()), true)) {
+        throw new IOException("Failed to clean up path " + this.tmpDir);
+      }
+    }
+  }
+
+  /**
+   * Test that multiple close calls do not raise an error
+   */
+  @Test
+  public void testMultipleClose() throws IOException {
+    Properties properties = new Properties();
+    properties.load(new FileReader("gobblin-core/src/test/resources/writer/hive_writer.properties"));
+
+    properties.setProperty("writer.staging.dir", new Path(tmpDir.getAbsolutePath(), "output-staging").toString());
+    properties.setProperty("writer.output.dir", new Path(tmpDir.getAbsolutePath(), "output").toString());
+    properties.setProperty("writer.file.path", ".");
+
+    SourceState sourceState = new SourceState(new State(properties), ImmutableList.<WorkUnitState> of());
+
+    DataWriter writer = new HiveWritableHdfsDataWriterBuilder<>().withBranches(1)
+            .withWriterId("0").writeTo(Destination.of(Destination.DestinationType.HDFS, sourceState))
+            .writeInFormat(WriterOutputFormat.ORC).build();
+
+    writer.close();
+    // check for file existence
+    Assert.assertTrue(this.fs.exists(new Path(new Path(tmpDir.getAbsolutePath(), "output-staging"), "writer-output.orc")),
+        "staging file not found");
+
+    // closed again is okay
+    writer.close();
+    // commit after close is okay
+    writer.commit();
+    Assert.assertTrue(this.fs.exists(new Path(new Path(tmpDir.getAbsolutePath(), "output"), "writer-output.orc")),
+        "output file not found");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/test/resources/writer/hive_writer.properties
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/writer/hive_writer.properties b/gobblin-core/src/test/resources/writer/hive_writer.properties
new file mode 100644
index 0000000..38edf4a
--- /dev/null
+++ b/gobblin-core/src/test/resources/writer/hive_writer.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+serde.serializer.type=ORC
+serde.deserializer.type=AVRO
+writer.file.name=writer-output.orc
+