You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/06/30 23:45:14 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1207] Clear references to potentially large objects in Fork, FileBasedExtractor, and HiveWritableHdfsDataWriter

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f99b612  [GOBBLIN-1207] Clear references to potentially large objects in Fork, FileBasedExtractor, and HiveWritableHdfsDataWriter
f99b612 is described below

commit f99b612ab59e1fc6a438b693273bc7295b9c99d1
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Tue Jun 30 16:45:05 2020 -0700

    [GOBBLIN-1207] Clear references to potentially large objects in Fork, FileBasedExtractor, and HiveWritableHdfsDataWriter
    
    Closes #3052 from htran1/fork_writer_close
---
 .../gobblin/configuration/ConfigurationKeys.java   |   3 +
 .../extractor/filebased/FileBasedExtractor.java    |   2 +
 .../gobblin/writer/HiveWritableHdfsDataWriter.java |  21 ++--
 .../java/org/apache/gobblin/runtime/fork/Fork.java |   9 ++
 .../org/apache/gobblin/runtime/fork/ForkTest.java  | 129 +++++++++++++++++++++
 5 files changed, 154 insertions(+), 10 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 424424b..29ad2d4 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -349,6 +349,9 @@ public class ConfigurationKeys {
   public static final String DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT = TimeUnit.MILLISECONDS.name();
   public static final String FORK_MAX_WAIT_MININUTES = "fork.max.wait.minutes";
   public static final long DEFAULT_FORK_MAX_WAIT_MININUTES = 60;
+  public static final String FORK_CLOSE_WRITER_ON_COMPLETION = "fork.closeWriterOnCompletion";
+  public static final boolean DEFAULT_FORK_CLOSE_WRITER_ON_COMPLETION = true;
+
 
   /**
    * Writer configuration properties.
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedExtractor.java
index a8becca..2e40103 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedExtractor.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedExtractor.java
@@ -160,6 +160,8 @@ public class FileBasedExtractor<S, D> extends InstrumentedExtractor<S, D> {
     if (this.currentFile != null && this.currentFileItr != null) {
       closeCurrentFile();
       incrementBytesReadCounter();
+      // release the reference to allow garbage collection
+      this.currentFileItr = null;
     }
 
     while (!this.hasNext && !this.filesToPull.isEmpty()) {
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index a83dc05..2aab198 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -42,7 +42,7 @@ import org.apache.gobblin.configuration.State;
  */
 public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
 
-  protected final RecordWriter writer;
+  protected RecordWriter writer;
   protected final AtomicLong count = new AtomicLong(0);
   // the close method may be invoked multiple times, but the underlying writer only supports close being called once
   private boolean closed = false;
@@ -101,24 +101,25 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {
 
   @Override
   public void close() throws IOException {
-    // close the underlying writer if not already closed. The close can only be called once for the underlying writer,
-    // so remember the state
-    if (!this.closed) {
-      this.writer.close(false);
-      this.closed = true;
-    }
-
+    closeInternal();
     super.close();
   }
 
   @Override
   public void commit() throws IOException {
+    closeInternal();
+    super.commit();
+  }
+
+  private void closeInternal() throws IOException {
+    // close the underlying writer if not already closed. The close can only be called once for the underlying writer,
+    // so remember the state
     if (!this.closed) {
       this.writer.close(false);
+      // release reference to allow GC since this writer can hold onto large buffers for some formats like ORC.
+      this.writer = null;
       this.closed = true;
     }
-
-    super.commit();
   }
 
   @Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 138185c..706d8f2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -248,6 +248,15 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
     compareAndSetForkState(ForkState.PENDING, ForkState.RUNNING);
     try {
       processRecords();
+
+      // Close the writer now if configured. One case where this is set is to release memory from ORC writers that can
+      // have large buffers. Making this an opt-in option to avoid breaking anything that relies on keeping the writer
+      // open until commit.
+      if (this.writer.isPresent() && taskContext.getTaskState().getPropAsBoolean(
+        ConfigurationKeys.FORK_CLOSE_WRITER_ON_COMPLETION, ConfigurationKeys.DEFAULT_FORK_CLOSE_WRITER_ON_COMPLETION)) {
+        this.writer.get().close();
+      }
+
       compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
     } catch (Throwable t) {
       // Set throwable to holder first because AsynchronousFork::putRecord can pull the throwable when it detects ForkState.FAILED status.
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/fork/ForkTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/fork/ForkTest.java
new file mode 100644
index 0000000..a5e9d13
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/fork/ForkTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.fork;
+
+import java.io.IOException;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.runtime.ExecutionModel;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.writer.RetryWriter;
+import org.junit.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class ForkTest {
+  @DataProvider(name = "closeConfigProvider")
+  public static Object[][] closeConfigProvider() {
+    // {close on done, expected count}
+    return new Object[][]{{"true", 1}, {"false", 0}};
+  }
+
+  @Test(dataProvider = "closeConfigProvider")
+  public void TestForCloseWriterTrue(String closeConfig, int expectedCloseCount) throws Exception {
+    WorkUnitState wus = new WorkUnitState();
+    wus.setProp(ConfigurationKeys.FORK_CLOSE_WRITER_ON_COMPLETION, closeConfig);
+    wus.setProp(ConfigurationKeys.JOB_ID_KEY, "job2");
+    wus.setProp(ConfigurationKeys.TASK_ID_KEY, "task1");
+    wus.setProp(RetryWriter.RETRY_WRITER_ENABLED, "false");
+    wus.setProp(ConfigurationKeys.WRITER_EAGER_INITIALIZATION_KEY, "true");
+    wus.setProp(ConfigurationKeys.WRITER_BUILDER_CLASS, DummyDataWriterBuilder.class.getName());
+
+    TaskContext taskContext = new TaskContext(wus);
+    Fork testFork = new TestFork(taskContext, null, 0, 0, ExecutionModel.BATCH);
+
+    Assert.assertNotNull(testFork.getWriter());
+
+    testFork.run();
+
+    Assert.assertEquals(expectedCloseCount, DummyDataWriterBuilder.getWriter().getCloseCount());
+  }
+
+  private static class TestFork extends Fork {
+
+    public TestFork(TaskContext taskContext, Object schema, int branches, int index, ExecutionModel executionModel)
+        throws Exception {
+      super(taskContext, schema, branches, index, executionModel);
+    }
+
+    @Override
+    protected void processRecords() throws IOException, DataConversionException {
+    }
+
+    @Override
+    protected boolean putRecordImpl(Object record) throws InterruptedException {
+      return true;
+    }
+  }
+
+  public static class DummyDataWriterBuilder extends DataWriterBuilder<String, Integer> {
+    private static ThreadLocal<DummyWriter> myThreadLocal = ThreadLocal.withInitial(() -> new DummyWriter());
+
+    @Override
+    public DataWriter<Integer> build() throws IOException {
+      getWriter().setCloseCount(0);
+      return getWriter();
+    }
+
+    public static DummyWriter getWriter() {
+      return myThreadLocal.get();
+    }
+  }
+
+  private static class DummyWriter implements DataWriter<Integer> {
+    @Getter
+    @Setter
+    private int closeCount = 0;
+
+    DummyWriter() {
+    }
+
+    @Override
+    public void write(Integer record) throws IOException {
+    }
+
+    @Override
+    public void commit() throws IOException {
+    }
+
+    @Override
+    public void cleanup() throws IOException {
+    }
+
+    @Override
+    public long recordsWritten() {
+      return 0;
+    }
+
+    @Override
+    public long bytesWritten() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.closeCount++;
+    }
+  }
+}