You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/02/25 13:21:00 UTC

[hop] branch master updated: Revert HOP-3789

This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/master by this push:
     new eebee9b  Revert HOP-3789
     new 806483a  Merge pull request #1398 from hansva/master
eebee9b is described below

commit eebee9b22527a5f35b3c814a4065ef823230bca1
Author: Hans Van Akelyen <ha...@gmail.com>
AuthorDate: Fri Feb 25 14:15:37 2022 +0100

    Revert HOP-3789
---
 .../pipelineexecutor/PipelineExecutor.java         | 243 +++++++++------------
 .../pipelineexecutor/PipelineExecutorData.java     |   1 -
 2 files changed, 105 insertions(+), 139 deletions(-)

diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
index d2064ba..e3ba0cc 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
@@ -37,14 +37,12 @@ import org.apache.hop.pipeline.transform.ITransform;
 import org.apache.hop.pipeline.transform.TransformMeta;
 
 import java.util.*;
-import java.util.concurrent.locks.ReentrantLock;
 
 /** Execute a pipeline for every input row, set parameters. */
 public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, PipelineExecutorData>
     implements ITransform<PipelineExecutorMeta, PipelineExecutorData> {
 
   private static final Class<?> PKG = PipelineExecutorMeta.class; // For Translator
-  private final ReentrantLock executionLock = new ReentrantLock();
 
   public PipelineExecutor(
       TransformMeta transformMeta,
@@ -64,134 +62,98 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
    */
   @Override
   public boolean processRow() throws HopException {
+    try {
+      PipelineExecutorData pipelineExecutorData = getData();
+      // Wait for a row...
+      Object[] row = getRow();
+
+      if (row == null) {
+        executePipeline(null);
+        setOutputDone();
+        return false;
+      }
 
-    PipelineExecutorData pipelineExecutorData = getData();
-    // Wait for a row...
-    Object[] row = getRow();
+      List<String> incomingFieldValues = new ArrayList<>();
+      if (getInputRowMeta() != null) {
+        for (int i = 0; i < getInputRowMeta().size(); i++) {
+          String fieldvalue = getInputRowMeta().getString(row, i);
+          incomingFieldValues.add(fieldvalue);
+        }
+      }
 
-    if (row == null) {
-      executePipeline(null);
-      setOutputDone();
-      return false;
-    }
+      if (first) {
 
-    List<String> incomingFieldValues = new ArrayList<>();
-    if (getInputRowMeta() != null) {
-      for (int i = 0; i < getInputRowMeta().size(); i++) {
-        String fieldvalue = getInputRowMeta().getString(row, i);
-        incomingFieldValues.add(fieldvalue);
+        first = false;
+        if (!meta.isFilenameInField()) {
+          initOnFirstProcessingIteration();
+        }
       }
-    }
 
-    if (first) {
-      first = false;
-      if (!meta.isFilenameInField()) {
-        initPipelineProcessingIteration();
-      } else {
+      if (meta.isFilenameInField()) {
         IRowMeta rowMeta = getInputRowMeta();
-        pipelineExecutorData.fieldFilenameIndex = rowMeta.indexOfValue(meta.getFilenameField());
+        int pos = rowMeta.indexOfValue(meta.getFilenameField());
+        String filename = (String) row[pos];
+        if (pipelineExecutorData.prevFilename == null
+                || !pipelineExecutorData.prevFilename.equals(filename)) {
+          logDetailed("Identified a new pipeline to execute: '" + filename + "'");
+          meta.setFilename(filename);
+          pipelineExecutorData.prevFilename = filename;
+          pipelineExecutorData.groupBuffer = null;
+          initPipeline(pipelineExecutorData);
+          initOnFirstProcessingIteration();
+        }
       }
-    }
 
-    IRowSet executorTransformOutputRowSet = pipelineExecutorData.getExecutorTransformOutputRowSet();
-    if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
-        && executorTransformOutputRowSet != null) {
-      putRowTo(
-          pipelineExecutorData.getExecutorTransformOutputRowMeta(),
-          row,
-          executorTransformOutputRowSet);
-    }
+      IRowSet executorTransformOutputRowSet =
+              pipelineExecutorData.getExecutorTransformOutputRowSet();
+      if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
+              && executorTransformOutputRowSet != null) {
+        putRowTo(
+                pipelineExecutorData.getExecutorTransformOutputRowMeta(),
+                row,
+                executorTransformOutputRowSet);
+      }
 
-    // Grouping by field and execution time works ONLY if grouping by size is disabled.
-    if (pipelineExecutorData.groupSize < 0) {
-      if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
-        Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
-        if (pipelineExecutorData.prevGroupFieldData != null) {
-          if (pipelineExecutorData.groupFieldMeta.compare(
-                  pipelineExecutorData.prevGroupFieldData, groupFieldData)
-              != 0) {
-            // Let's lock data collection for a while until in-progress pipeline finishes execution
-            executionLock.lock();
-            log.logDebug("Lock set (Group By Field)");
-            try {
-              if (meta.isFilenameInField()) {
-                String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
-                checkIfPipelineChanged(pipelineExecutorData, filename);
-                executePipeline(getLastIncomingFieldValues());
-              } else executePipeline(getLastIncomingFieldValues());
-            } finally {
-              executionLock.unlock();
-              log.logDebug("Lock removed (Group By Field)");
+      // Grouping by field and execution time works ONLY if grouping by size is disabled.
+      if (pipelineExecutorData.groupSize < 0) {
+        if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
+          Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
+          if (pipelineExecutorData.prevGroupFieldData != null) {
+            if (pipelineExecutorData.groupFieldMeta.compare(
+                    pipelineExecutorData.prevGroupFieldData, groupFieldData)
+                != 0) {
+              executePipeline(getLastIncomingFieldValues());
             }
           }
-        }
-        pipelineExecutorData.prevGroupFieldData = groupFieldData;
-      } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
-        long now = System.currentTimeMillis();
-        if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
-          executionLock.lock();
-          log.logDebug("Lock set (Group By Time)");
-          try {
-            if (meta.isFilenameInField()) {
-              String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
-              checkIfPipelineChanged(pipelineExecutorData, filename);
-
-              executePipeline(incomingFieldValues);
-            } else executePipeline(incomingFieldValues);
-          } finally {
-            executionLock.unlock();
-            log.logDebug("Lock removed (Group By Time)");
+          pipelineExecutorData.prevGroupFieldData = groupFieldData;
+        } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
+          long now = System.currentTimeMillis();
+          if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
+            executePipeline(incomingFieldValues);
           }
         }
       }
-    }
 
-    // Add next value AFTER pipeline execution, in case we are grouping by field,
-    // and BEFORE checking size of a group, in case we are grouping by size.
-    pipelineExecutorData.groupBuffer.add(
-        new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
-
-    // Grouping by size.
-    // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
-    if (pipelineExecutorData.groupSize > 0) {
-      if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
-        executionLock.lock();
-        log.logDebug("Lock set (Group By Size)");
-        try {
-          if (meta.isFilenameInField()) {
-            String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
-            checkIfPipelineChanged(pipelineExecutorData, filename);
-            executePipeline(incomingFieldValues);
-          } else executePipeline(incomingFieldValues);
-        } finally {
-          executionLock.unlock();
-          log.logDebug("Lock removed (Group By Size)");
+      // Add next value AFTER pipeline execution, in case we are grouping by field,
+      // and BEFORE checking size of a group, in case we are grouping by size.
+      pipelineExecutorData.groupBuffer.add(
+          new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
+
+      // Grouping by size.
+      // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
+      if (pipelineExecutorData.groupSize > 0) {
+        if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
+          executePipeline(incomingFieldValues);
         }
       }
-    }
-    return true;
-  }
-
-  private void checkIfPipelineChanged(PipelineExecutorData pipelineExecutorData, String filename)
-      throws HopException {
-
-    if (pipelineExecutorData.prevFilename == null
-        || !pipelineExecutorData.prevFilename.equals(filename)) {
-      logDetailed("Identified a new pipeline to execute: '" + filename + "'");
 
-      meta.setFilename(filename);
-      pipelineExecutorData.prevFilename = filename;
-
-      try {
-        pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
-      } catch (Exception e) {
-        throw new HopException("No valid pipeline was specified nor loaded!");
-      }
-      initPipelineProcessingIteration();
+      return true;
+    } catch (Exception e) {
+      throw new HopException(BaseMessages.getString(PKG, "PipelineExecutor.UnexpectedError"), e);
     }
   }
 
-  private void initPipelineProcessingIteration() throws HopException {
+  private void initOnFirstProcessingIteration() throws HopException {
     PipelineExecutorData pipelineExecutorData = getData();
     // internal pipeline's first transform has exactly the same input
     pipelineExecutorData.setInputRowMeta(getInputRowMeta());
@@ -250,13 +212,14 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
 
     PipelineExecutorData pipelineExecutorData = getData();
     // If we got 0 rows on input we don't really want to execute the pipeline
-    if (pipelineExecutorData.groupBuffer.isEmpty()) {
+    if (pipelineExecutorData.groupBuffer == null || pipelineExecutorData.groupBuffer.isEmpty()) {
       return;
     }
     pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
 
-    // Clear log buffer before new pipeline gets executed
-    discardLogLines(pipelineExecutorData);
+    if (first) {
+      discardLogLines(pipelineExecutorData);
+    }
 
     IPipelineEngine<PipelineMeta> executorPipeline = createInternalPipeline();
     pipelineExecutorData.setExecutorPipeline(executorPipeline);
@@ -533,7 +496,6 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
     boolean transformSuccessfullyInitialized = super.init();
 
     if (transformSuccessfullyInitialized) {
-      initPipeline(pipelineExecutorData);
       // First we need to load the mapping (pipeline)
       try {
         if ((!meta.isFilenameInField() && Utils.isEmpty(meta.getFilename()))
@@ -541,16 +503,12 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
           logError("No pipeline filename given either in path or in a field!");
           transformSuccessfullyInitialized = false;
         } else {
+
           if (!meta.isFilenameInField() && !Utils.isEmpty(meta.getFilename())) {
-            pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
-            if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
-              transformSuccessfullyInitialized = true;
-            } else {
-              transformSuccessfullyInitialized = false;
-              logError("No valid pipeline was specified nor loaded!");
-            }
+            transformSuccessfullyInitialized = initPipeline(pipelineExecutorData);
           }
         }
+
       } catch (Exception e) {
         logError("Unable to load the pipeline executor because of an error : ", e);
       }
@@ -560,28 +518,37 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
     return transformSuccessfullyInitialized;
   }
 
-  private void initPipeline(PipelineExecutorData pipelineExecutorData) {
+  private boolean initPipeline(PipelineExecutorData pipelineExecutorData) throws HopException {
+
+    pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
 
     // Do we have a pipeline at all?
-    pipelineExecutorData.groupBuffer = new ArrayList<>();
+    if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
+      pipelineExecutorData.groupBuffer = new ArrayList<>();
 
-    // How many rows do we group together for the pipeline?
-    if (!Utils.isEmpty(meta.getGroupSize())) {
-      pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
-    } else {
-      pipelineExecutorData.groupSize = -1;
-    }
-    // Is there a grouping time set?
-    if (!Utils.isEmpty(meta.getGroupTime())) {
-      pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
-    } else {
-      pipelineExecutorData.groupTime = -1;
-    }
-    pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
+      // How many rows do we group together for the pipeline?
+      if (!Utils.isEmpty(meta.getGroupSize())) {
+        pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
+      } else {
+        pipelineExecutorData.groupSize = -1;
+      }
+      // Is there a grouping time set?
+      if (!Utils.isEmpty(meta.getGroupTime())) {
+        pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
+      } else {
+        pipelineExecutorData.groupTime = -1;
+      }
+      pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
 
-    // Is there a grouping field set?
-    if (!Utils.isEmpty(meta.getGroupField())) {
-      pipelineExecutorData.groupField = resolve(meta.getGroupField());
+      // Is there a grouping field set?
+      if (!Utils.isEmpty(meta.getGroupField())) {
+        pipelineExecutorData.groupField = resolve(meta.getGroupField());
+      }
+      // That's all for now...
+      return true;
+    } else {
+      logError("No valid pipeline was specified nor loaded!");
+      return false;
     }
   }
 
diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
index 49aeb41..3b1e465 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
@@ -46,7 +46,6 @@ public class PipelineExecutorData extends BaseTransformData implements ITransfor
   public String groupField;
   public int groupFieldIndex;
   public IValueMeta groupFieldMeta;
-  public int fieldFilenameIndex;
   public String prevFilename;
 
   public Object prevGroupFieldData;