You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/02/24 15:41:10 UTC

[hop] branch master updated: HOP-3789 PipelineExecutor: synchronization needed on pipeline change

This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc9d79  HOP-3789 PipelineExecutor: synchronization needed on pipeline change
     new bbadaab  Merge pull request #1393 from sramazzina/MEMORYLEAK
0cc9d79 is described below

commit 0cc9d79d90bf33161b5029bd9756da8af3943495
Author: sergio.ramazzina <se...@serasoft.it>
AuthorDate: Thu Feb 24 13:04:01 2022 +0100

    HOP-3789 PipelineExecutor: synchronization needed on pipeline change
---
 .../pipelineexecutor/PipelineExecutor.java         | 243 ++++++++++++---------
 .../pipelineexecutor/PipelineExecutorData.java     |   1 +
 2 files changed, 139 insertions(+), 105 deletions(-)

diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
index e3ba0cc..d2064ba 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
@@ -37,12 +37,14 @@ import org.apache.hop.pipeline.transform.ITransform;
 import org.apache.hop.pipeline.transform.TransformMeta;
 
 import java.util.*;
+import java.util.concurrent.locks.ReentrantLock;
 
 /** Execute a pipeline for every input row, set parameters. */
 public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, PipelineExecutorData>
     implements ITransform<PipelineExecutorMeta, PipelineExecutorData> {
 
   private static final Class<?> PKG = PipelineExecutorMeta.class; // For Translator
+  private final ReentrantLock executionLock = new ReentrantLock();
 
   public PipelineExecutor(
       TransformMeta transformMeta,
@@ -62,98 +64,134 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
    */
   @Override
   public boolean processRow() throws HopException {
-    try {
-      PipelineExecutorData pipelineExecutorData = getData();
-      // Wait for a row...
-      Object[] row = getRow();
-
-      if (row == null) {
-        executePipeline(null);
-        setOutputDone();
-        return false;
-      }
 
-      List<String> incomingFieldValues = new ArrayList<>();
-      if (getInputRowMeta() != null) {
-        for (int i = 0; i < getInputRowMeta().size(); i++) {
-          String fieldvalue = getInputRowMeta().getString(row, i);
-          incomingFieldValues.add(fieldvalue);
-        }
-      }
+    PipelineExecutorData pipelineExecutorData = getData();
+    // Wait for a row...
+    Object[] row = getRow();
 
-      if (first) {
+    if (row == null) {
+      executePipeline(null);
+      setOutputDone();
+      return false;
+    }
 
-        first = false;
-        if (!meta.isFilenameInField()) {
-          initOnFirstProcessingIteration();
-        }
+    List<String> incomingFieldValues = new ArrayList<>();
+    if (getInputRowMeta() != null) {
+      for (int i = 0; i < getInputRowMeta().size(); i++) {
+        String fieldvalue = getInputRowMeta().getString(row, i);
+        incomingFieldValues.add(fieldvalue);
       }
+    }
 
-      if (meta.isFilenameInField()) {
+    if (first) {
+      first = false;
+      if (!meta.isFilenameInField()) {
+        initPipelineProcessingIteration();
+      } else {
         IRowMeta rowMeta = getInputRowMeta();
-        int pos = rowMeta.indexOfValue(meta.getFilenameField());
-        String filename = (String) row[pos];
-        if (pipelineExecutorData.prevFilename == null
-                || !pipelineExecutorData.prevFilename.equals(filename)) {
-          logDetailed("Identified a new pipeline to execute: '" + filename + "'");
-          meta.setFilename(filename);
-          pipelineExecutorData.prevFilename = filename;
-          pipelineExecutorData.groupBuffer = null;
-          initPipeline(pipelineExecutorData);
-          initOnFirstProcessingIteration();
-        }
+        pipelineExecutorData.fieldFilenameIndex = rowMeta.indexOfValue(meta.getFilenameField());
       }
+    }
 
-      IRowSet executorTransformOutputRowSet =
-              pipelineExecutorData.getExecutorTransformOutputRowSet();
-      if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
-              && executorTransformOutputRowSet != null) {
-        putRowTo(
-                pipelineExecutorData.getExecutorTransformOutputRowMeta(),
-                row,
-                executorTransformOutputRowSet);
-      }
+    IRowSet executorTransformOutputRowSet = pipelineExecutorData.getExecutorTransformOutputRowSet();
+    if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
+        && executorTransformOutputRowSet != null) {
+      putRowTo(
+          pipelineExecutorData.getExecutorTransformOutputRowMeta(),
+          row,
+          executorTransformOutputRowSet);
+    }
 
-      // Grouping by field and execution time works ONLY if grouping by size is disabled.
-      if (pipelineExecutorData.groupSize < 0) {
-        if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
-          Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
-          if (pipelineExecutorData.prevGroupFieldData != null) {
-            if (pipelineExecutorData.groupFieldMeta.compare(
-                    pipelineExecutorData.prevGroupFieldData, groupFieldData)
-                != 0) {
-              executePipeline(getLastIncomingFieldValues());
+    // Grouping by field and execution time works ONLY if grouping by size is disabled.
+    if (pipelineExecutorData.groupSize < 0) {
+      if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
+        Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
+        if (pipelineExecutorData.prevGroupFieldData != null) {
+          if (pipelineExecutorData.groupFieldMeta.compare(
+                  pipelineExecutorData.prevGroupFieldData, groupFieldData)
+              != 0) {
+            // Let's lock data collection for a while until in-progress pipeline finishes execution
+            executionLock.lock();
+            log.logDebug("Lock set (Group By Field)");
+            try {
+              if (meta.isFilenameInField()) {
+                String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
+                checkIfPipelineChanged(pipelineExecutorData, filename);
+                executePipeline(getLastIncomingFieldValues());
+              } else executePipeline(getLastIncomingFieldValues());
+            } finally {
+              executionLock.unlock();
+              log.logDebug("Lock removed (Group By Field)");
             }
           }
-          pipelineExecutorData.prevGroupFieldData = groupFieldData;
-        } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
-          long now = System.currentTimeMillis();
-          if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
-            executePipeline(incomingFieldValues);
+        }
+        pipelineExecutorData.prevGroupFieldData = groupFieldData;
+      } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
+        long now = System.currentTimeMillis();
+        if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
+          executionLock.lock();
+          log.logDebug("Lock set (Group By Time)");
+          try {
+            if (meta.isFilenameInField()) {
+              String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
+              checkIfPipelineChanged(pipelineExecutorData, filename);
+
+              executePipeline(incomingFieldValues);
+            } else executePipeline(incomingFieldValues);
+          } finally {
+            executionLock.unlock();
+            log.logDebug("Lock removed (Group By Time)");
           }
         }
       }
+    }
 
-      // Add next value AFTER pipeline execution, in case we are grouping by field,
-      // and BEFORE checking size of a group, in case we are grouping by size.
-      pipelineExecutorData.groupBuffer.add(
-          new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
-
-      // Grouping by size.
-      // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
-      if (pipelineExecutorData.groupSize > 0) {
-        if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
-          executePipeline(incomingFieldValues);
+    // Add next value AFTER pipeline execution, in case we are grouping by field,
+    // and BEFORE checking size of a group, in case we are grouping by size.
+    pipelineExecutorData.groupBuffer.add(
+        new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
+
+    // Grouping by size.
+    // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
+    if (pipelineExecutorData.groupSize > 0) {
+      if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
+        executionLock.lock();
+        log.logDebug("Lock set (Group By Size)");
+        try {
+          if (meta.isFilenameInField()) {
+            String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
+            checkIfPipelineChanged(pipelineExecutorData, filename);
+            executePipeline(incomingFieldValues);
+          } else executePipeline(incomingFieldValues);
+        } finally {
+          executionLock.unlock();
+          log.logDebug("Lock removed (Group By Size)");
         }
       }
+    }
+    return true;
+  }
+
+  private void checkIfPipelineChanged(PipelineExecutorData pipelineExecutorData, String filename)
+      throws HopException {
+
+    if (pipelineExecutorData.prevFilename == null
+        || !pipelineExecutorData.prevFilename.equals(filename)) {
+      logDetailed("Identified a new pipeline to execute: '" + filename + "'");
 
-      return true;
-    } catch (Exception e) {
-      throw new HopException(BaseMessages.getString(PKG, "PipelineExecutor.UnexpectedError"), e);
+      meta.setFilename(filename);
+      pipelineExecutorData.prevFilename = filename;
+
+      try {
+        pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
+      } catch (Exception e) {
+        throw new HopException("No valid pipeline was specified nor loaded!");
+      }
+      initPipelineProcessingIteration();
     }
   }
 
-  private void initOnFirstProcessingIteration() throws HopException {
+  private void initPipelineProcessingIteration() throws HopException {
     PipelineExecutorData pipelineExecutorData = getData();
     // internal pipeline's first transform has exactly the same input
     pipelineExecutorData.setInputRowMeta(getInputRowMeta());
@@ -212,14 +250,13 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
 
     PipelineExecutorData pipelineExecutorData = getData();
     // If we got 0 rows on input we don't really want to execute the pipeline
-    if (pipelineExecutorData.groupBuffer == null || pipelineExecutorData.groupBuffer.isEmpty()) {
+    if (pipelineExecutorData.groupBuffer.isEmpty()) {
       return;
     }
     pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
 
-    if (first) {
-      discardLogLines(pipelineExecutorData);
-    }
+    // Clear log buffer before new pipeline gets executed
+    discardLogLines(pipelineExecutorData);
 
     IPipelineEngine<PipelineMeta> executorPipeline = createInternalPipeline();
     pipelineExecutorData.setExecutorPipeline(executorPipeline);
@@ -496,6 +533,7 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
     boolean transformSuccessfullyInitialized = super.init();
 
     if (transformSuccessfullyInitialized) {
+      initPipeline(pipelineExecutorData);
       // First we need to load the mapping (pipeline)
       try {
         if ((!meta.isFilenameInField() && Utils.isEmpty(meta.getFilename()))
@@ -503,12 +541,16 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
           logError("No pipeline filename given either in path or in a field!");
           transformSuccessfullyInitialized = false;
         } else {
-
           if (!meta.isFilenameInField() && !Utils.isEmpty(meta.getFilename())) {
-            transformSuccessfullyInitialized = initPipeline(pipelineExecutorData);
+            pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
+            if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
+              transformSuccessfullyInitialized = true;
+            } else {
+              transformSuccessfullyInitialized = false;
+              logError("No valid pipeline was specified nor loaded!");
+            }
           }
         }
-
       } catch (Exception e) {
         logError("Unable to load the pipeline executor because of an error : ", e);
       }
@@ -518,37 +560,28 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
     return transformSuccessfullyInitialized;
   }
 
-  private boolean initPipeline(PipelineExecutorData pipelineExecutorData) throws HopException {
-
-    pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
+  private void initPipeline(PipelineExecutorData pipelineExecutorData) {
 
     // Do we have a pipeline at all?
-    if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
-      pipelineExecutorData.groupBuffer = new ArrayList<>();
+    pipelineExecutorData.groupBuffer = new ArrayList<>();
 
-      // How many rows do we group together for the pipeline?
-      if (!Utils.isEmpty(meta.getGroupSize())) {
-        pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
-      } else {
-        pipelineExecutorData.groupSize = -1;
-      }
-      // Is there a grouping time set?
-      if (!Utils.isEmpty(meta.getGroupTime())) {
-        pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
-      } else {
-        pipelineExecutorData.groupTime = -1;
-      }
-      pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
-
-      // Is there a grouping field set?
-      if (!Utils.isEmpty(meta.getGroupField())) {
-        pipelineExecutorData.groupField = resolve(meta.getGroupField());
-      }
-      // That's all for now...
-      return true;
+    // How many rows do we group together for the pipeline?
+    if (!Utils.isEmpty(meta.getGroupSize())) {
+      pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
     } else {
-      logError("No valid pipeline was specified nor loaded!");
-      return false;
+      pipelineExecutorData.groupSize = -1;
+    }
+    // Is there a grouping time set?
+    if (!Utils.isEmpty(meta.getGroupTime())) {
+      pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
+    } else {
+      pipelineExecutorData.groupTime = -1;
+    }
+    pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
+
+    // Is there a grouping field set?
+    if (!Utils.isEmpty(meta.getGroupField())) {
+      pipelineExecutorData.groupField = resolve(meta.getGroupField());
     }
   }
 
diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
index 3b1e465..49aeb41 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
@@ -46,6 +46,7 @@ public class PipelineExecutorData extends BaseTransformData implements ITransfor
   public String groupField;
   public int groupFieldIndex;
   public IValueMeta groupFieldMeta;
+  public int fieldFilenameIndex;
   public String prevFilename;
 
   public Object prevGroupFieldData;