You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/02/24 15:41:10 UTC
[hop] branch master updated: HOP-3789 PipelineExecutor: synchronization needed on pipeline change
This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc9d79 HOP-3789 PipelineExecutor: synchronization needed on pipeline change
new bbadaab Merge pull request #1393 from sramazzina/MEMORYLEAK
0cc9d79 is described below
commit 0cc9d79d90bf33161b5029bd9756da8af3943495
Author: sergio.ramazzina <se...@serasoft.it>
AuthorDate: Thu Feb 24 13:04:01 2022 +0100
HOP-3789 PipelineExecutor: synchronization needed on pipeline change
---
.../pipelineexecutor/PipelineExecutor.java | 243 ++++++++++++---------
.../pipelineexecutor/PipelineExecutorData.java | 1 +
2 files changed, 139 insertions(+), 105 deletions(-)
diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
index e3ba0cc..d2064ba 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutor.java
@@ -37,12 +37,14 @@ import org.apache.hop.pipeline.transform.ITransform;
import org.apache.hop.pipeline.transform.TransformMeta;
import java.util.*;
+import java.util.concurrent.locks.ReentrantLock;
/** Execute a pipeline for every input row, set parameters. */
public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, PipelineExecutorData>
implements ITransform<PipelineExecutorMeta, PipelineExecutorData> {
private static final Class<?> PKG = PipelineExecutorMeta.class; // For Translator
+ private final ReentrantLock executionLock = new ReentrantLock();
public PipelineExecutor(
TransformMeta transformMeta,
@@ -62,98 +64,134 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
*/
@Override
public boolean processRow() throws HopException {
- try {
- PipelineExecutorData pipelineExecutorData = getData();
- // Wait for a row...
- Object[] row = getRow();
-
- if (row == null) {
- executePipeline(null);
- setOutputDone();
- return false;
- }
- List<String> incomingFieldValues = new ArrayList<>();
- if (getInputRowMeta() != null) {
- for (int i = 0; i < getInputRowMeta().size(); i++) {
- String fieldvalue = getInputRowMeta().getString(row, i);
- incomingFieldValues.add(fieldvalue);
- }
- }
+ PipelineExecutorData pipelineExecutorData = getData();
+ // Wait for a row...
+ Object[] row = getRow();
- if (first) {
+ if (row == null) {
+ executePipeline(null);
+ setOutputDone();
+ return false;
+ }
- first = false;
- if (!meta.isFilenameInField()) {
- initOnFirstProcessingIteration();
- }
+ List<String> incomingFieldValues = new ArrayList<>();
+ if (getInputRowMeta() != null) {
+ for (int i = 0; i < getInputRowMeta().size(); i++) {
+ String fieldvalue = getInputRowMeta().getString(row, i);
+ incomingFieldValues.add(fieldvalue);
}
+ }
- if (meta.isFilenameInField()) {
+ if (first) {
+ first = false;
+ if (!meta.isFilenameInField()) {
+ initPipelineProcessingIteration();
+ } else {
IRowMeta rowMeta = getInputRowMeta();
- int pos = rowMeta.indexOfValue(meta.getFilenameField());
- String filename = (String) row[pos];
- if (pipelineExecutorData.prevFilename == null
- || !pipelineExecutorData.prevFilename.equals(filename)) {
- logDetailed("Identified a new pipeline to execute: '" + filename + "'");
- meta.setFilename(filename);
- pipelineExecutorData.prevFilename = filename;
- pipelineExecutorData.groupBuffer = null;
- initPipeline(pipelineExecutorData);
- initOnFirstProcessingIteration();
- }
+ pipelineExecutorData.fieldFilenameIndex = rowMeta.indexOfValue(meta.getFilenameField());
}
+ }
- IRowSet executorTransformOutputRowSet =
- pipelineExecutorData.getExecutorTransformOutputRowSet();
- if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
- && executorTransformOutputRowSet != null) {
- putRowTo(
- pipelineExecutorData.getExecutorTransformOutputRowMeta(),
- row,
- executorTransformOutputRowSet);
- }
+ IRowSet executorTransformOutputRowSet = pipelineExecutorData.getExecutorTransformOutputRowSet();
+ if (pipelineExecutorData.getExecutorTransformOutputRowMeta() != null
+ && executorTransformOutputRowSet != null) {
+ putRowTo(
+ pipelineExecutorData.getExecutorTransformOutputRowMeta(),
+ row,
+ executorTransformOutputRowSet);
+ }
- // Grouping by field and execution time works ONLY if grouping by size is disabled.
- if (pipelineExecutorData.groupSize < 0) {
- if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
- Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
- if (pipelineExecutorData.prevGroupFieldData != null) {
- if (pipelineExecutorData.groupFieldMeta.compare(
- pipelineExecutorData.prevGroupFieldData, groupFieldData)
- != 0) {
- executePipeline(getLastIncomingFieldValues());
+ // Grouping by field and execution time works ONLY if grouping by size is disabled.
+ if (pipelineExecutorData.groupSize < 0) {
+ if (pipelineExecutorData.groupFieldIndex >= 0) { // grouping by field
+ Object groupFieldData = row[pipelineExecutorData.groupFieldIndex];
+ if (pipelineExecutorData.prevGroupFieldData != null) {
+ if (pipelineExecutorData.groupFieldMeta.compare(
+ pipelineExecutorData.prevGroupFieldData, groupFieldData)
+ != 0) {
+ // Let's lock data collection for a while until in-progress pipeline finishes execution
+ executionLock.lock();
+ log.logDebug("Lock set (Group By Field)");
+ try {
+ if (meta.isFilenameInField()) {
+ String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
+ checkIfPipelineChanged(pipelineExecutorData, filename);
+ executePipeline(getLastIncomingFieldValues());
+ } else executePipeline(getLastIncomingFieldValues());
+ } finally {
+ executionLock.unlock();
+ log.logDebug("Lock removed (Group By Field)");
}
}
- pipelineExecutorData.prevGroupFieldData = groupFieldData;
- } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
- long now = System.currentTimeMillis();
- if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
- executePipeline(incomingFieldValues);
+ }
+ pipelineExecutorData.prevGroupFieldData = groupFieldData;
+ } else if (pipelineExecutorData.groupTime > 0) { // grouping by execution time
+ long now = System.currentTimeMillis();
+ if (now - pipelineExecutorData.groupTimeStart >= pipelineExecutorData.groupTime) {
+ executionLock.lock();
+ log.logDebug("Lock set (Group By Time)");
+ try {
+ if (meta.isFilenameInField()) {
+ String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
+ checkIfPipelineChanged(pipelineExecutorData, filename);
+
+ executePipeline(incomingFieldValues);
+ } else executePipeline(incomingFieldValues);
+ } finally {
+ executionLock.unlock();
+ log.logDebug("Lock removed (Group By Time)");
}
}
}
+ }
- // Add next value AFTER pipeline execution, in case we are grouping by field,
- // and BEFORE checking size of a group, in case we are grouping by size.
- pipelineExecutorData.groupBuffer.add(
- new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
-
- // Grouping by size.
- // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
- if (pipelineExecutorData.groupSize > 0) {
- if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
- executePipeline(incomingFieldValues);
+ // Add next value AFTER pipeline execution, in case we are grouping by field,
+ // and BEFORE checking size of a group, in case we are grouping by size.
+ pipelineExecutorData.groupBuffer.add(
+ new RowMetaAndData(getInputRowMeta(), row)); // should we clone for safety?
+
+ // Grouping by size.
+ // If group buffer size exceeds specified limit, then execute pipeline and flush group buffer.
+ if (pipelineExecutorData.groupSize > 0) {
+ if (pipelineExecutorData.groupBuffer.size() >= pipelineExecutorData.groupSize) {
+ executionLock.lock();
+ log.logDebug("Lock set (Group By Size)");
+ try {
+ if (meta.isFilenameInField()) {
+ String filename = (String) row[pipelineExecutorData.fieldFilenameIndex];
+ checkIfPipelineChanged(pipelineExecutorData, filename);
+ executePipeline(incomingFieldValues);
+ } else executePipeline(incomingFieldValues);
+ } finally {
+ executionLock.unlock();
+ log.logDebug("Lock removed (Group By Size)");
}
}
+ }
+ return true;
+ }
+
+ private void checkIfPipelineChanged(PipelineExecutorData pipelineExecutorData, String filename)
+ throws HopException {
+
+ if (pipelineExecutorData.prevFilename == null
+ || !pipelineExecutorData.prevFilename.equals(filename)) {
+ logDetailed("Identified a new pipeline to execute: '" + filename + "'");
- return true;
- } catch (Exception e) {
- throw new HopException(BaseMessages.getString(PKG, "PipelineExecutor.UnexpectedError"), e);
+ meta.setFilename(filename);
+ pipelineExecutorData.prevFilename = filename;
+
+ try {
+ pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
+ } catch (Exception e) {
+ throw new HopException("No valid pipeline was specified nor loaded!");
+ }
+ initPipelineProcessingIteration();
}
}
- private void initOnFirstProcessingIteration() throws HopException {
+ private void initPipelineProcessingIteration() throws HopException {
PipelineExecutorData pipelineExecutorData = getData();
// internal pipeline's first transform has exactly the same input
pipelineExecutorData.setInputRowMeta(getInputRowMeta());
@@ -212,14 +250,13 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
PipelineExecutorData pipelineExecutorData = getData();
// If we got 0 rows on input we don't really want to execute the pipeline
- if (pipelineExecutorData.groupBuffer == null || pipelineExecutorData.groupBuffer.isEmpty()) {
+ if (pipelineExecutorData.groupBuffer.isEmpty()) {
return;
}
pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
- if (first) {
- discardLogLines(pipelineExecutorData);
- }
+ // Clear log buffer before new pipeline gets executed
+ discardLogLines(pipelineExecutorData);
IPipelineEngine<PipelineMeta> executorPipeline = createInternalPipeline();
pipelineExecutorData.setExecutorPipeline(executorPipeline);
@@ -496,6 +533,7 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
boolean transformSuccessfullyInitialized = super.init();
if (transformSuccessfullyInitialized) {
+ initPipeline(pipelineExecutorData);
// First we need to load the mapping (pipeline)
try {
if ((!meta.isFilenameInField() && Utils.isEmpty(meta.getFilename()))
@@ -503,12 +541,16 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
logError("No pipeline filename given either in path or in a field!");
transformSuccessfullyInitialized = false;
} else {
-
if (!meta.isFilenameInField() && !Utils.isEmpty(meta.getFilename())) {
- transformSuccessfullyInitialized = initPipeline(pipelineExecutorData);
+ pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
+ if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
+ transformSuccessfullyInitialized = true;
+ } else {
+ transformSuccessfullyInitialized = false;
+ logError("No valid pipeline was specified nor loaded!");
+ }
}
}
-
} catch (Exception e) {
logError("Unable to load the pipeline executor because of an error : ", e);
}
@@ -518,37 +560,28 @@ public class PipelineExecutor extends BaseTransform<PipelineExecutorMeta, Pipeli
return transformSuccessfullyInitialized;
}
- private boolean initPipeline(PipelineExecutorData pipelineExecutorData) throws HopException {
-
- pipelineExecutorData.setExecutorPipelineMeta(loadExecutorPipelineMeta());
+ private void initPipeline(PipelineExecutorData pipelineExecutorData) {
// Do we have a pipeline at all?
- if (pipelineExecutorData.getExecutorPipelineMeta() != null) {
- pipelineExecutorData.groupBuffer = new ArrayList<>();
+ pipelineExecutorData.groupBuffer = new ArrayList<>();
- // How many rows do we group together for the pipeline?
- if (!Utils.isEmpty(meta.getGroupSize())) {
- pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
- } else {
- pipelineExecutorData.groupSize = -1;
- }
- // Is there a grouping time set?
- if (!Utils.isEmpty(meta.getGroupTime())) {
- pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
- } else {
- pipelineExecutorData.groupTime = -1;
- }
- pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
-
- // Is there a grouping field set?
- if (!Utils.isEmpty(meta.getGroupField())) {
- pipelineExecutorData.groupField = resolve(meta.getGroupField());
- }
- // That's all for now...
- return true;
+ // How many rows do we group together for the pipeline?
+ if (!Utils.isEmpty(meta.getGroupSize())) {
+ pipelineExecutorData.groupSize = Const.toInt(resolve(meta.getGroupSize()), -1);
} else {
- logError("No valid pipeline was specified nor loaded!");
- return false;
+ pipelineExecutorData.groupSize = -1;
+ }
+ // Is there a grouping time set?
+ if (!Utils.isEmpty(meta.getGroupTime())) {
+ pipelineExecutorData.groupTime = Const.toInt(resolve(meta.getGroupTime()), -1);
+ } else {
+ pipelineExecutorData.groupTime = -1;
+ }
+ pipelineExecutorData.groupTimeStart = System.currentTimeMillis();
+
+ // Is there a grouping field set?
+ if (!Utils.isEmpty(meta.getGroupField())) {
+ pipelineExecutorData.groupField = resolve(meta.getGroupField());
}
}
diff --git a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
index 3b1e465..49aeb41 100644
--- a/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
+++ b/plugins/transforms/pipelineexecutor/src/main/java/org/apache/hop/pipeline/transforms/pipelineexecutor/PipelineExecutorData.java
@@ -46,6 +46,7 @@ public class PipelineExecutorData extends BaseTransformData implements ITransfor
public String groupField;
public int groupFieldIndex;
public IValueMeta groupFieldMeta;
+ public int fieldFilenameIndex;
public String prevFilename;
public Object prevGroupFieldData;