You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/11 20:36:13 UTC

[gobblin] branch master updated: GOBBLIN-1715: Support vectorized row batch pooling (#3574)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fafd40b25 GOBBLIN-1715: Support vectorized row batch pooling (#3574)
fafd40b25 is described below

commit fafd40b25fdd3e8f8299f9e939794ad7a2492314
Author: Ratandeep Ratti <rd...@gmail.com>
AuthorDate: Tue Oct 11 13:36:06 2022 -0700

    GOBBLIN-1715: Support vectorized row batch pooling (#3574)
---
 .../gobblin/writer/GobblinBaseOrcWriter.java       |  22 +++-
 .../org/apache/gobblin/writer/RowBatchPool.java    | 131 +++++++++++++++++++++
 .../apache/gobblin/writer/RowBatchPoolTest.java    |  53 +++++++++
 3 files changed, 201 insertions(+), 5 deletions(-)

diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index b6400d59b..b05c3af2b 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 import java.util.Properties;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.orc.OrcConf;
@@ -90,8 +91,11 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
 
   private final OrcValueWriter<D> valueWriter;
   @VisibleForTesting
-  final VectorizedRowBatch rowBatch;
+  VectorizedRowBatch rowBatch;
+  private final TypeDescription typeDescription;
   private final Writer orcFileWriter;
+  private final RowBatchPool rowBatchPool;
+  private final boolean enableRowBatchPool;
 
   // the close method may be invoked multiple times, but the underlying writer only supports close being called once
   private volatile boolean closed = false;
@@ -101,7 +105,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
   protected final S inputSchema;
 
   /**
-   * There are couple of parameters in ORC writer that requires manual tuning based on record size given that executor
+   * There are a couple of parameters in ORC writer that requires manual tuning based on record size given that executor
    * for running these ORC writers has limited heap space. This helper function wrap them and has side effect for the
    * argument {@param properties}.
    *
@@ -153,14 +157,18 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
 
     // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);
+    this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
+    this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize);
     this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
 
     log.info("Created ORC writer, batch size: {}, {}: {}",
-            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.name(), properties.getProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
+            properties.getProp(
+                    OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
                     OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));
 
     // Create file-writer
@@ -235,6 +243,9 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
       this.flush();
       this.orcFileWriter.close();
       this.closed = true;
+      if (enableRowBatchPool) {
+        rowBatchPool.recycle(typeDescription, rowBatch);
+      }
     } else {
       // Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
       if (rowBatch.size > 0) {
@@ -269,6 +280,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
   @Override
   public void write(D record)
       throws IOException {
+    Preconditions.checkState(!closed, "Writer already closed");
     valueWriter.write(record, rowBatch);
     if (rowBatch.size == this.batchSize) {
       orcFileWriter.addRowBatch(rowBatch);
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
new file mode 100644
index 000000000..38231a9ed
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/***
+ * Maintains a pool of row batches per orc schema.
+ * Expires row batches which have not been accessed for {@code ROW_BATCH_EXPIRY_INTERVAL}
+ */
+@Slf4j
+public class RowBatchPool {
+    static final String PREFIX = "orc.row.batch.";
+    static final String ENABLE_ROW_BATCH_POOL = PREFIX + "enable";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = PREFIX + "expiry.interval.secs";
+    static final int DEFAULT_ROW_BATCH_EXPIRY_INTERVAL = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = PREFIX + "expiry.period.secs";
+    static final int DEFAULT_ROW_BATCH_EXPIRY_PERIOD = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, DEFAULT_ROW_BATCH_EXPIRY_INTERVAL);
+        // check every N secs
+        long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, DEFAULT_ROW_BATCH_EXPIRY_PERIOD);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since {} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+
+            if (vals == null || vals.size() == 0) {
+                rowBatch = schema.createRowBatch(batchSize);
+                log.info("Creating new row batch {}", System.identityHashCode(rowBatch));
+            } else {
+                rowBatch = vals.removeLast().rowBatch;
+                log.info("Using existing row batch {}", System.identityHashCode(rowBatch));
+            }
+            return rowBatch;
+        }
+    }
+
+    public void recycle(TypeDescription schema, VectorizedRowBatch rowBatch) {
+        log.info("Recycling row batch {}", System.identityHashCode(rowBatch));
+        synchronized (rowBatches) {
+            rowBatches.computeIfAbsent(schema, ignore -> Lists.newLinkedList());
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            vals.add(new RowBatchHolder(rowBatch, System.currentTimeMillis()));
+        }
+    }
+}
diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java
new file mode 100644
index 000000000..e14989ec6
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class RowBatchPoolTest {
+    @Test
+    public void testExpiry() throws Exception {
+        State state = WorkUnit.createEmpty();
+        RowBatchPool instance = RowBatchPool.instance(state);
+        TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
+        VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
+        instance.recycle(schema, rowBatch1);
+        VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
+        // existing rowbatch is fetched from pool
+        Assert.assertEquals(rowBatch1, rowBatch2);
+
+        // since the pool has no existing rowbatch, a new one is created
+        VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
+        Assert.assertNotEquals(rowBatch1, rowBatch3);
+
+        // recyle fetched rowbatches
+        instance.recycle(schema, rowBatch2);
+        instance.recycle(schema, rowBatch3);
+
+        // wait for their expiry
+        Thread.sleep(RowBatchPool.DEFAULT_ROW_BATCH_EXPIRY_INTERVAL * 1000L);
+        VectorizedRowBatch rowBatch4 = instance.getRowBatch(schema, 1024);
+        // new rowbatch is created, all old ones are expired
+        Assert.assertNotEquals(rowBatch1, rowBatch4);
+    }
+}