You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/25 19:31:01 UTC
incubator-gobblin git commit: [GOBBLIN-671] Close the underlying writer when a HiveWritableHdfsData…
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 55a19bbfe -> 60d4e61d0
[GOBBLIN-671] Close the underlying writer when a HiveWritableHdfsData…
Closes #2541 from htran1/hive_writer_close
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/60d4e61d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/60d4e61d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/60d4e61d
Branch: refs/heads/master
Commit: 60d4e61d09664e4ea2af511a3a53f6cd81d1963a
Parents: 55a19bb
Author: Hung Tran <hu...@linkedin.com>
Authored: Fri Jan 25 11:30:55 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jan 25 11:30:55 2019 -0800
----------------------------------------------------------------------
.../org/apache/gobblin/writer/FsDataWriter.java | 29 ++++---
.../writer/HiveWritableHdfsDataWriter.java | 20 ++++-
.../writer/HiveWritableHdfsDataWriterTest.java | 89 ++++++++++++++++++++
.../resources/writer/hive_writer.properties | 21 +++++
4 files changed, 144 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index d814622..dadb51f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -261,6 +261,21 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
// For the same reason as deleting the staging file if it already exists, overwrite
// the output file if it already exists to prevent task retry from being blocked.
HadoopUtils.renamePath(this.fileContext, this.stagingFile, this.outputFile, true);
+
+ // The staging file is moved to the output path in commit, so rename to add record count after that
+ if (this.shouldIncludeRecordCountInFileName) {
+ String filePathWithRecordCount = addRecordCountToFileName();
+ this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount);
+ } else {
+ this.properties.appendToSetProp(this.allOutputFilesPropName, getOutputFilePath());
+ }
+
+ FsWriterMetrics metrics = new FsWriterMetrics(
+ this.id,
+ new PartitionIdentifier(this.partitionKey, this.branchId),
+ ImmutableSet.of(new FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten()))
+ );
+ this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson());
}
/**
@@ -285,20 +300,6 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
public void close()
throws IOException {
this.closer.close();
-
- if (this.shouldIncludeRecordCountInFileName) {
- String filePathWithRecordCount = addRecordCountToFileName();
- this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount);
- } else {
- this.properties.appendToSetProp(this.allOutputFilesPropName, getOutputFilePath());
- }
-
- FsWriterMetrics metrics = new FsWriterMetrics(
- this.id,
- new PartitionIdentifier(this.partitionKey, this.branchId),
- ImmutableSet.of(new FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten()))
- );
- this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson());
}
private synchronized String addRecordCountToFileName()
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index b1a3046..1145920 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -44,6 +44,8 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
protected final RecordWriter writer;
protected final AtomicLong count = new AtomicLong(0);
+ // the close method may be invoked multiple times, but the underlying writer only supports close being called once
+ private boolean closed = false;
public HiveWritableHdfsDataWriter(HiveWritableHdfsDataWriterBuilder<?> builder, State properties) throws IOException {
super(builder, properties);
@@ -92,8 +94,24 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
}
@Override
+ public void close() throws IOException {
+ // close the underlying writer if not already closed. The close can only be called once for the underlying writer,
+ // so remember the state
+ if (!this.closed) {
+ this.writer.close(false);
+ this.closed = true;
+ }
+
+ super.close();
+ }
+
+ @Override
public void commit() throws IOException {
- this.writer.close(false);
+ if (!this.closed) {
+ this.writer.close(false);
+ this.closed = true;
+ }
+
super.commit();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java
new file mode 100644
index 0000000..5fb5ee4
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+public class HiveWritableHdfsDataWriterTest {
+ private FileSystem fs;
+ private File tmpDir;
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ tmpDir = Files.createTempDir();
+ this.fs = FileSystem.get(new Configuration());
+ }
+
+ @AfterClass
+ public void cleanUp() throws IOException {
+ if (this.fs.exists(new Path(this.tmpDir.getAbsolutePath()))) {
+ if (!this.fs.delete(new Path(this.tmpDir.getAbsolutePath()), true)) {
+ throw new IOException("Failed to clean up path " + this.tmpDir);
+ }
+ }
+ }
+
+ /**
+ * Test that multiple close calls do not raise an error
+ */
+ @Test
+ public void testMultipleClose() throws IOException {
+ Properties properties = new Properties();
+ properties.load(new FileReader("gobblin-core/src/test/resources/writer/hive_writer.properties"));
+
+ properties.setProperty("writer.staging.dir", new Path(tmpDir.getAbsolutePath(), "output-staging").toString());
+ properties.setProperty("writer.output.dir", new Path(tmpDir.getAbsolutePath(), "output").toString());
+ properties.setProperty("writer.file.path", ".");
+
+ SourceState sourceState = new SourceState(new State(properties), ImmutableList.<WorkUnitState> of());
+
+ DataWriter writer = new HiveWritableHdfsDataWriterBuilder<>().withBranches(1)
+ .withWriterId("0").writeTo(Destination.of(Destination.DestinationType.HDFS, sourceState))
+ .writeInFormat(WriterOutputFormat.ORC).build();
+
+ writer.close();
+ // check for file existence
+ Assert.assertTrue(this.fs.exists(new Path(new Path(tmpDir.getAbsolutePath(), "output-staging"), "writer-output.orc")),
+ "staging file not found");
+
+ // closed again is okay
+ writer.close();
+ // commit after close is okay
+ writer.commit();
+ Assert.assertTrue(this.fs.exists(new Path(new Path(tmpDir.getAbsolutePath(), "output"), "writer-output.orc")),
+ "output file not found");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/test/resources/writer/hive_writer.properties
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/writer/hive_writer.properties b/gobblin-core/src/test/resources/writer/hive_writer.properties
new file mode 100644
index 0000000..38edf4a
--- /dev/null
+++ b/gobblin-core/src/test/resources/writer/hive_writer.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+serde.serializer.type=ORC
+serde.deserializer.type=AVRO
+writer.file.name=writer-output.orc
+