You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/10/01 02:21:07 UTC

[GitHub] [gobblin] homatthew commented on a diff in pull request #3574: GOBBLIN-1715: Support for Vectorized row batch pooling

homatthew commented on code in PR #3574:
URL: https://github.com/apache/gobblin/pull/3574#discussion_r985012096


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);

Review Comment:
   You forgot to replace these with the default values declared above



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);

Review Comment:
   Is GobblinBaseOrcWriter created / GC'ed multiple often during lifetime of the pipeline? Or is this just created once at the start of the pipeline. Not sure if it's overkill, but if the recycling is frequent enough it doesn't hurt to use https://www.baeldung.com/java-singleton-double-checked-locking instead.



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since {} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {
+                rowBatch = schema.createRowBatch(batchSize);
+                log.info("Creating new row batch {}", System.identityHashCode(rowBatch));
+            } else {
+                rowBatch = vals.removeLast().rowBatch;
+                log.info("Using existing row batch {}", System.identityHashCode(rowBatch));
+            }
+            return rowBatch;
+        }
+    }
+
+    public void recycle(TypeDescription schema, VectorizedRowBatch rowBatch) {
+        log.info("Recycling row batch {}", System.identityHashCode(rowBatch));
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            if (vals == null) {

Review Comment:
   Preference between computeIfAbsent and null check?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+

Review Comment:
   Really minor nits about style. Typically we follow a pattern of having a shared prefix variable for `orc.row.batch.expiry.` or even just `orc.row.batch` as the prefix. And then we start the default values with the name `DEFAULT_`
   
   See https://github.com/apache/gobblin/blob/3733d6028c437e18eff349ba56d8264a56d4673f/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java#L109-L110 for an example. 
   
   Also, maybe add the word pool to these settings since these are specific to the batch pool and not for regular row batch?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);
+    this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, true);

Review Comment:
   The default should probably be a separate variable. Did we plan a gradual roll out? If so, default of false may be preferred until we are very certain about the performance of this feature across all topics.



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs

Review Comment:
   I wonder if it's clearer to have these comments as doc strings instead where they are declared. Kind of impartial since it's all private variables anyways but I think if this class were to grow in size / complexity that is a better spot



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";

Review Comment:
   Also, naming wise it's a little confusing. interval and period are very similar words / synonyms so when someone reads the config they will likely get confused unless they read the code. Here are some ideas
   
   ```
   orc.row.batch.pool.checkExpiryIntervalSeconds
   orc.row.batch.pool.expiryTTLSeconds
   ```



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -235,6 +243,9 @@ private synchronized void closeInternal()
       this.flush();
       this.orcFileWriter.close();
       this.closed = true;
+      if (enableRowBatchPool) {

Review Comment:
   How often does this occur?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -269,6 +280,7 @@ public void commit()
   @Override
   public void write(D record)
       throws IOException {
+    Preconditions.checkState(!closed, "Writer already closed");

Review Comment:
   When did you see this edge case happen? And does this cause the fork to immediately terminate?



##########
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java:
##########
@@ -0,0 +1,36 @@
+package org.apache.gobblin.writer;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class RowBatchPoolTest {
+    @Test
+    public void testExpiry() throws Exception {
+        State state = WorkUnit.createEmpty();
+        RowBatchPool instance = RowBatchPool.instance(state);
+        TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
+        VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
+        instance.recycle(schema, rowBatch1);
+        VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
+        // existing rowbatch is fetched from pool
+        Assert.assertEquals(rowBatch1, rowBatch2);
+
+        // since the pool has no existing rowbatch, a new one is created
+        VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
+        Assert.assertNotEquals(rowBatch1, rowBatch3);
+
+        // recyle fetched rowbatches
+        instance.recycle(schema, rowBatch2);
+        instance.recycle(schema, rowBatch3);
+
+        // wait for their expiry
+        Thread.sleep(RowBatchPool.ROW_BATCH_EXPIRY_INTERVAL_DEFAULT * 1000L);

Review Comment:
   In general, thread sleep in UT's are bad longterm for our CI. We should really be relying on an a Clock-like object to demonstrate elapsed time. Then we can mock the passing of time without wasting 10 seconds doing nothing.



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since {} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {

Review Comment:
   Dumb question but do we have a preference between apache commons `CollectionUtils.isEmpty` and a basic null check like this?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);

Review Comment:
   I did some digging, and it seems like we use the GobblinOrcWriter in 2 spots. 
   1. [Fork](https://github.com/apache/gobblin/blob/022b49fdd99cebcc21aad9b6b7cde9df0b092537/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java#L249)
   2. [AbstractJobLauncher](https://github.com/apache/gobblin/blob/b726a606cea3deb567b1fdeeba9acbcc220e6d30/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java#L516)
   
   In (1) we init a writer per fork and we may have multiple forks in a Task, especially in streaming model task runner. From my understanding, a fork is long running so we only create a new orc writer during the the initial run of a task and retries
   In (2) we init once during the job launch. 
   
   I think I am missing something here though because we wouldn't need to recycle the row batches via a pool if they were create once and forget. 🤔 
   



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);
+    this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, true);
+    this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize);
     this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
 
     log.info("Created ORC writer, batch size: {}, {}: {}",
-            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.name(), properties.getProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),

Review Comment:
   Is the reason for changing this because attribute is more general and name is specific to hive? I noticed that for this  specific enum value they are the same string. 



##########
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java:
##########
@@ -0,0 +1,36 @@
+package org.apache.gobblin.writer;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class RowBatchPoolTest {
+    @Test
+    public void testExpiry() throws Exception {
+        State state = WorkUnit.createEmpty();
+        RowBatchPool instance = RowBatchPool.instance(state);
+        TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
+        VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
+        instance.recycle(schema, rowBatch1);
+        VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
+        // existing rowbatch is fetched from pool
+        Assert.assertEquals(rowBatch1, rowBatch2);
+
+        // since the pool has no existing rowbatch, a new one is created
+        VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
+        Assert.assertNotEquals(rowBatch1, rowBatch3);
+
+        // recyle fetched rowbatches
+        instance.recycle(schema, rowBatch2);
+        instance.recycle(schema, rowBatch3);
+
+        // wait for their expiry
+        Thread.sleep(RowBatchPool.ROW_BATCH_EXPIRY_INTERVAL_DEFAULT * 1000L);

Review Comment:
   Slightly different because I used stopwatch instead of clock, but the here's the idea in case you're interested. Replace stopwatch with clock and you should get a similar result
   
   https://github.com/homatthew/gobblin/blob/816d5f969fb8142a5f8d927288778840f82889ba/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetricsTest.java



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since {} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {
+                rowBatch = schema.createRowBatch(batchSize);
+                log.info("Creating new row batch {}", System.identityHashCode(rowBatch));
+            } else {
+                rowBatch = vals.removeLast().rowBatch;

Review Comment:
   What's the reason for using the last one versus the first one? My guess is that LIFO ordering minimizes the number of objects in general versus FIFO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org