You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/02/25 13:21:00 UTC
[hop] branch master updated: Revert HOP-3789
This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new eebee9b Revert HOP-3789
new 806483a Merge pull request #1398 from hansva/master
eebee9b is described below
commit eebee9b22527a5f35b3c814a4065ef823230bca1
Author: Hans Van Akelyen <ha...@gmail.com>
AuthorDate: Fri Feb 25 14:15:37 2022 +0100
Revert HOP-3789
---
.../pipelineexecutor/PipelineExecutor.java | 243 +++++++++------------
.../pipelineexecutor/PipelineExecutorData.java | 1 -
2 files changed, 105 insertions(+), 139 deletions(-)
diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
index d2064ba..e3ba0cc 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
@@ -37,14 +37,12 @@ import org.apache.hop.pipeline.transform.ITransform;
import org.apache.hop.pipeline.transform.TransformMeta;
import java.util.*;
-import java.util.concurrent.locks.ReentrantLock;
/** Execute a pipeline for every input row, set parameters. */
public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, PipelineExecutorData>
implements ITransform<PipelineExecutorMeta, PipelineExecutorData> {
private static final Class<?> PKG = PipelineExecutorMeta.class; // For Translator
- private final ReentrantLock executionLock = new ReentrantLock();
public PipelineExecutor(
TransformMeta transformMeta,
@@ -64,134 +62,98 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
*/
@Override
public boolean processRow() throws HopException {
+ try {
+ PipelineExecutorData pipelineExecutorData = getData();
+ // Wait for a row...
+ Object[] row = getRow();
+
+ if (row == null) {
+ executePipeline(null);
+ setOutputDone();
+ return false;
+ }
- PipelineExecutorData pipelineExecutorData = getData();
- // Wait for a row...
- Object[] row = getRow();
+ List<String> incomingFieldValues = new ArrayList<>();
+ if (getInputRowMeta() != null) {
+ for (int i = 0; i < getInputRowMeta().size(); i++) {
+ String fieldvalue = getInputRowMeta().getString(row, i);
+ incomingFieldValues.add(fieldvalue);
+ }
+ }
- if (row == null) {
- executePipeline(null);
- setOutputDone();
- return false;
- }
+ if (first) {
- List<String> incomingFieldValues = new ArrayList<>();
- if (getInputRowMeta() != null) {
- for (int i = 0; i < getInputRowMeta().size(); i++) {
- String fieldvalue = getInputRowMeta().getString(row, i);
- incomingFieldValues.add(fieldvalue);
+ first = false;
+ if (!meta.isFilenameInField()) {
+ initOnFirstProcessingIteration();
+ }
}
- }
- if (first) {
- first = false;
- if (!meta.isFilenameInField()) {
- initPipelineProcessingIteration();
- } else {
+ if (meta.isFilenameInField()) {
IRowMeta rowMeta = getInputRowMeta();
- pipelineExecutorData.fieldFilenameIndex = rowMeta.indexOfValue(meta.getFilenameField());
+ int pos = rowMeta.indexOfValue(meta.getFilenameField());
+ String filename = (String) row[pos];
+ if (pipelineExecutorData.prevFilename == null
+ || !pipelineExecutorData.prevFilename.equals(filename)) {
+ logDetailed("Identified a new pipeline to execute: '" + filename + "'");
+ meta.setFilename(filename);
+ pipelineExecutorData.prevFilename = filename;
+ pipelineExecutorData.groupBuffer = null;
+ initPipeline(pipelineExecutorData);
+ initOnFirstProcessingIteration();
+ }
}
- }
- IRowSet executorTransformOutputRowSet = pipelineExecutorData.getExecutorTransformOutputRowSet();
- if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
- && executorTransformOutputRowSet != null) {
- putRowTo(
- pipelineExecutorData.getExecutorTransformOutputRowMeta(),
- row,
- executorTransformOutputRowSet);
- }
+ IRowSet executorTransformOutputRowSet =
+ pipelineExecutorData.getExecutorTransformOutputRowSet();
+ if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
+ && executorTransformOutputRowSet != null) {
+ putRowTo(
+ pipelineExecutorData.getExecutorTransformOutputRowMeta(),
+ row,
+ executorTransformOutputRowSet);
+ }
- // Grouping by field and execution time works ONLY if grouping by size is disabled.
- if (pipelineExecutorData.groupSize < 0) {
- if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
- Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
- if (pipelineExecutorData.prevGroupFieldData != null) {
- if (pipelineExecutorData.groupFieldMeta.compare(
- pipelineExecutorData.prevGroupFieldData, groupFieldData)
- != 0) {
- // Let's lock data collection for a while until in-progress pipeline finishes execution
- executionLock.lock();
- log.logDebug("Lock set (Group By Field)");
- try {
- if (meta.isFilenameInField()) {
- String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
- checkIfPipelineChanged(pipelineExecutorData, filename);
- executePipeline(getLastIncomingFieldValues());
- } else executePipeline(getLastIncomingFieldValues());
- } finally {
- executionLock.unlock();
- log.logDebug("Lock removed (Group By Field)");
+ // Grouping by field and execution time works ONLY if grouping by size is disabled.
+ if (pipelineExecutorData.groupSize < 0) {
+ if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
+ Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
+ if (pipelineExecutorData.prevGroupFieldData != null) {
+ if (pipelineExecutorData.groupFieldMeta.compare(
+ pipelineExecutorData.prevGroupFieldData, groupFieldData)
+ != 0) {
+ executePipeline(getLastIncomingFieldValues());
}
}
- }
- pipelineExecutorData.prevGroupFieldData = groupFieldData;
- } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
- long now = System.currentTimeMillis();
- if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
- executionLock.lock();
- log.logDebug("Lock set (Group By Time)");
- try {
- if (meta.isFilenameInField()) {
- String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
- checkIfPipelineChanged(pipelineExecutorData, filename);
-
- executePipeline(incomingFieldValues);
- } else executePipeline(incomingFieldValues);
- } finally {
- executionLock.unlock();
- log.logDebug("Lock removed (Group By Time)");
+ pipelineExecutorData.prevGroupFieldData = groupFieldData;
+ } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
+ long now = System.currentTimeMillis();
+ if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
+ executePipeline(incomingFieldValues);
}
}
}
- }
- // Add next value AFTER pipeline execution, in case we are grouping by field,
- // and BEFORE checking size of a group, in case we are grouping by size.
- pipelineExecutorData.groupBuffer.add(
- new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
-
- // Grouping by size.
- // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
- if (pipelineExecutorData.groupSize > 0) {
- if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
- executionLock.lock();
- log.logDebug("Lock set (Group By Size)");
- try {
- if (meta.isFilenameInField()) {
- String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
- checkIfPipelineChanged(pipelineExecutorData, filename);
- executePipeline(incomingFieldValues);
- } else executePipeline(incomingFieldValues);
- } finally {
- executionLock.unlock();
- log.logDebug("Lock removed (Group By Size)");
+ // Add next value AFTER pipeline execution, in case we are grouping by field,
+ // and BEFORE checking size of a group, in case we are grouping by size.
+ pipelineExecutorData.groupBuffer.add(
+ new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
+
+ // Grouping by size.
+ // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
+ if (pipelineExecutorData.groupSize > 0) {
+ if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
+ executePipeline(incomingFieldValues);
}
}
- }
- return true;
- }
-
- private void checkIfPipelineChanged(PipelineExecutorData pipelineExecutorData, String filename)
- throws HopException {
-
- if (pipelineExecutorData.prevFilename == null
- || !pipelineExecutorData.prevFilename.equals(filename)) {
- logDetailed("Identified a new pipeline to execute: '" + filename + "'");
- meta.setFilename(filename);
- pipelineExecutorData.prevFilename = filename;
-
- try {
- pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
- } catch (Exception e) {
- throw new HopException("No valid pipeline was specified nor loaded!");
- }
- initPipelineProcessingIteration();
+ return true;
+ } catch (Exception e) {
+ throw new HopException(BaseMessages.getString(PKG, "PipelineExecutor.UnexpectedError"), e);
}
}
- private void initPipelineProcessingIteration() throws HopException {
+ private void initOnFirstProcessingIteration() throws HopException {
PipelineExecutorData pipelineExecutorData = getData();
// internal pipeline's first transform has exactly the same input
pipelineExecutorData.setInputRowMeta(getInputRowMeta());
@@ -250,13 +212,14 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
PipelineExecutorData pipelineExecutorData = getData();
// If we got 0 rows on input we don't really want to execute the pipeline
- if (pipelineExecutorData.groupBuffer.isEmpty()) {
+ if (pipelineExecutorData.groupBuffer == null || pipelineExecutorData.groupBuffer.isEmpty()) {
return;
}
pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
- // Clear log buffer before new pipeline gets executed
- discardLogLines(pipelineExecutorData);
+ if (first) {
+ discardLogLines(pipelineExecutorData);
+ }
IPipelineEngine<PipelineMeta> executorPipeline = createInternalPipeline();
pipelineExecutorData.setExecutorPipeline(executorPipeline);
@@ -533,7 +496,6 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
boolean transformSuccessfullyInitialized = super.init();
if (transformSuccessfullyInitialized) {
- initPipeline(pipelineExecutorData);
// First we need to load the mapping (pipeline)
try {
if ((!meta.isFilenameInField() && Utils.isEmpty(meta.getFilename()))
@@ -541,16 +503,12 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
logError("No pipeline filename given either in path or in a field!");
transformSuccessfullyInitialized = false;
} else {
+
if (!meta.isFilenameInField() && !Utils.isEmpty(meta.getFilename())) {
- pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
- if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
- transformSuccessfullyInitialized = true;
- } else {
- transformSuccessfullyInitialized = false;
- logError("No valid pipeline was specified nor loaded!");
- }
+ transformSuccessfullyInitialized = initPipeline(pipelineExecutorData);
}
}
+
} catch (Exception e) {
logError("Unable to load the pipeline executor because of an error : ", e);
}
@@ -560,28 +518,37 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
return transformSuccessfullyInitialized;
}
- private void initPipeline(PipelineExecutorData pipelineExecutorData) {
+ private boolean initPipeline(PipelineExecutorData pipelineExecutorData) throws HopException {
+
+ pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
// Do we have a pipeline at all?
- pipelineExecutorData.groupBuffer = new ArrayList<>();
+ if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
+ pipelineExecutorData.groupBuffer = new ArrayList<>();
- // How many rows do we group together for the pipeline?
- if (!Utils.isEmpty(meta.getGroupSize())) {
- pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
- } else {
- pipelineExecutorData.groupSize = -1;
- }
- // Is there a grouping time set?
- if (!Utils.isEmpty(meta.getGroupTime())) {
- pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
- } else {
- pipelineExecutorData.groupTime = -1;
- }
- pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
+ // How many rows do we group together for the pipeline?
+ if (!Utils.isEmpty(meta.getGroupSize())) {
+ pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
+ } else {
+ pipelineExecutorData.groupSize = -1;
+ }
+ // Is there a grouping time set?
+ if (!Utils.isEmpty(meta.getGroupTime())) {
+ pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
+ } else {
+ pipelineExecutorData.groupTime = -1;
+ }
+ pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
- // Is there a grouping field set?
- if (!Utils.isEmpty(meta.getGroupField())) {
- pipelineExecutorData.groupField = resolve(meta.getGroupField());
+ // Is there a grouping field set?
+ if (!Utils.isEmpty(meta.getGroupField())) {
+ pipelineExecutorData.groupField = resolve(meta.getGroupField());
+ }
+ // That's all for now...
+ return true;
+ } else {
+ logError("No valid pipeline was specified nor loaded!");
+ return false;
}
}
diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
index 49aeb41..3b1e465 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
@@ -46,7 +46,6 @@ public class PipelineExecutorData extends BaseTransformData implements ITransfor
public String groupField;
public int groupFieldIndex;
public IValueMeta groupFieldMeta;
- public int fieldFilenameIndex;
public String prevFilename;
public Object prevGroupFieldData;