You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by mc...@apache.org on 2022/09/22 20:19:05 UTC
[hop] branch master updated: HOP-4172 : Forcing to a single thread in Beam doesn't work (fix, IT)
This is an automated email from the ASF dual-hosted git repository.
mcasters pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new b43d852493 HOP-4172 : Forcing to a single thread in Beam doesn't work (fix, IT)
new 90ba057bde Merge pull request #1704 from mattcasters/master
b43d852493 is described below
commit b43d852493280ebd6ea48086b484ae04d8690cd9
Author: Matt Casters <ma...@gmail.com>
AuthorDate: Thu Sep 22 21:48:27 2022 +0200
HOP-4172 : Forcing to a single thread in Beam doesn't work (fix, IT)
---
assemblies/plugins/tech/parquet/pom.xml | 5 +
.../plugins/tech/parquet/src/assembly/assembly.xml | 1 +
core/src/main/java/org/apache/hop/core/Const.java | 4 +
.../pipeline/SingleThreadedPipelineExecutor.java | 27 +-
.../hop/pipeline/transform/BaseTransform.java | 10 +-
.../hop/pipeline/transform/BaseTransformData.java | 42 ++
.../apache/hop/pipeline/transform/ITransform.java | 19 +-
.../hop/pipeline/transform/ITransformData.java | 29 ++
...pl => 0005-generate-single-file-validation.hpl} | 172 ++++++-
...json-file.hpl => 0005-generate-single-file.hpl} | 183 +++++--
...l => 0006-generate-bundle-files-validation.hpl} | 165 +++++-
...son-file.hpl => 0006-generate-bundle-files.hpl} | 181 +++++--
.../beam_directrunner/main-0005-single-thread.hwf | 12 +-
...ead.hwf => main-0006-generate-bundle-files.hwf} | 20 +-
...0005-generate-single-file-validation UNIT.json} | 65 ++-
...006-generate-bundle-files-validation UNIT.json} | 65 ++-
.../gcp/0007-single-thread-validation.hpl | 158 +++++-
integration-tests/gcp/0007-single-thread.hpl | 212 ++++----
.../0007-single-thread-validation UNIT.json | 49 ++
.../hop/beam/core/transform/TransformBaseFn.java | 39 ++
.../core/transform/TransformBatchTransform.java | 35 --
.../hop/beam/core/transform/TransformFn.java | 551 +++++++++++----------
.../beam/core/transform/TransformTransform.java | 24 -
.../HopPipelineMetaToBeamPipelineConverter.java | 6 +-
.../apache/hop/neo4j/transforms/cypher/Cypher.java | 4 +-
plugins/tech/parquet/pom.xml | 2 +-
.../parquet/transforms/output/ParquetOutput.java | 22 +
.../excelwriter/ExcelWriterFileField.java | 2 +-
.../excelwriter/ExcelWriterTransform.java | 102 ++--
.../excelwriter/ExcelWriterTransformData.java | 1 -
.../excelwriter/ExcelWriterTransformMeta.java | 11 +-
.../excelwriter/messages/messages_en_US.properties | 45 +-
.../transforms/javascript/ScriptValuesDummy.java | 47 +-
.../transforms/jsonoutput/BaseFileOutputMeta.java | 59 ++-
.../pipeline/transforms/jsonoutput/JsonOutput.java | 53 +-
.../transforms/jsonoutput/JsonOutputMeta.java | 2 +
.../hop/pipeline/transforms/sort/SortRows.java | 13 +
.../hop/pipeline/transforms/sort/SortRowsData.java | 2 +-
.../transforms/textfileoutput/TextFileOutput.java | 28 ++
.../textfileoutput/TextFileOutputMeta.java | 237 ++++++---
.../textfileoutput/TextFileOutputMetaTest.java | 4 +-
.../textfileoutput/TextFileOutputTest.java | 12 +
42 files changed, 1918 insertions(+), 802 deletions(-)
diff --git a/assemblies/plugins/tech/parquet/pom.xml b/assemblies/plugins/tech/parquet/pom.xml
index 386e4ec8e7..7821315428 100644
--- a/assemblies/plugins/tech/parquet/pom.xml
+++ b/assemblies/plugins/tech/parquet/pom.xml
@@ -123,6 +123,11 @@
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.woodstox</groupId>
+ <artifactId>woodstox-core</artifactId>
+ <version>5.3.0</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/assemblies/plugins/tech/parquet/src/assembly/assembly.xml b/assemblies/plugins/tech/parquet/src/assembly/assembly.xml
index c2c6bc0ebd..2d7ae7ef84 100644
--- a/assemblies/plugins/tech/parquet/src/assembly/assembly.xml
+++ b/assemblies/plugins/tech/parquet/src/assembly/assembly.xml
@@ -51,6 +51,7 @@
<useProjectArtifact>false</useProjectArtifact>
<scope>runtime</scope>
<includes>
+ <include>com.fasterxml.woodstox:woodstox-core</include>
<include>com.github.luben:zstd-jni:jar</include>
<include>com.google.code.findbugs:jsr305:jar</include>
<include>com.google.protobuf:protobuf-java:jar</include>
diff --git a/core/src/main/java/org/apache/hop/core/Const.java b/core/src/main/java/org/apache/hop/core/Const.java
index 9adf5d39f1..aff5930bdb 100644
--- a/core/src/main/java/org/apache/hop/core/Const.java
+++ b/core/src/main/java/org/apache/hop/core/Const.java
@@ -342,6 +342,10 @@ public class Const {
public static final String INTERNAL_VARIABLE_TRANSFORM_ID =
INTERNAL_VARIABLE_PREFIX + ".Transform.ID";
+ public static final String INTERNAL_VARIABLE_TRANSFORM_BUNDLE_NR =
+ INTERNAL_VARIABLE_PREFIX + ".Transform.BundleNr";
+
+
public static final String INTERNAL_VARIABLE_ACTION_ID =
INTERNAL_VARIABLE_PREFIX + ".Action.ID";
diff --git a/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java b/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java
index 75109f83fd..420ba9b3b7 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java
@@ -387,7 +387,8 @@ public class SingleThreadedPipelineExecutor {
"An exception was raised during a transform's execution: "
+ inProcessCombi.transformName);
this.exceptionsRaisedCounter += 1;
- } else throw new HopException("Error performing an iteration in a single threaded pipeline", e);
+ } else
+ throw new HopException("Error performing an iteration in a single threaded pipeline", e);
}
return nrDone < transforms.size() && !pipeline.isStopped();
}
@@ -430,12 +431,12 @@ public class SingleThreadedPipelineExecutor {
String.valueOf(lu),
String.valueOf(e + lj)));
}
- ((BaseTransform<?,?>) combi.transform).setLinesInput(0);
- ((BaseTransform<?,?>) combi.transform).setLinesOutput(0);
- ((BaseTransform<?,?>) combi.transform).setLinesWritten(0);
- ((BaseTransform<?,?>) combi.transform).setLinesRead(0);
- ((BaseTransform<?,?>) combi.transform).setLinesSkipped(0);
- ((BaseTransform<?,?>) combi.transform).setLinesUpdated(0);
+ ((BaseTransform<?, ?>) combi.transform).setLinesInput(0);
+ ((BaseTransform<?, ?>) combi.transform).setLinesOutput(0);
+ ((BaseTransform<?, ?>) combi.transform).setLinesWritten(0);
+ ((BaseTransform<?, ?>) combi.transform).setLinesRead(0);
+ ((BaseTransform<?, ?>) combi.transform).setLinesSkipped(0);
+ ((BaseTransform<?, ?>) combi.transform).setLinesUpdated(0);
combi.transform.setLinesRejected(0);
}
}
@@ -460,6 +461,18 @@ public class SingleThreadedPipelineExecutor {
return pipeline.isStopped();
}
+ public void startBundle() throws HopException {
+ for (TransformMetaDataCombi combi : pipeline.getTransforms()) {
+ combi.transform.startBundle();
+ }
+ }
+
+ public void finishBundle() throws HopException {
+ for (TransformMetaDataCombi combi : pipeline.getTransforms()) {
+ combi.transform.finishBundle();
+ }
+ }
+
public void dispose() throws HopException {
// Call output done.
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
index 8b1d22fe79..25c2ce3884 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
@@ -3679,14 +3679,18 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
this.containerObjectId = containerObjectId;
}
- /*
- * (non-Javadoc)
- *
+ /**
* @see org.apache.hop.pipeline.transform.ITransform#batchComplete()
*/
@Override
public void batchComplete() throws HopException {}
+ @Override
+ public void startBundle() throws HopException {}
+
+ @Override
+ public void finishBundle() throws HopException {}
+
/**
* Returns the registration date
*
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java
index 0f3ac09e44..8518f990ca 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java
@@ -30,6 +30,12 @@ public abstract class BaseTransformData implements ITransformData {
/** The status. */
private ComponentExecutionStatus status;
+ /** Set to true if the transform is running in a Beam pipeline */
+ private boolean beamContext;
+
+ /** The Beam bundle number */
+ private int beamBundleNr;
+
/** Instantiates a new base transform data. */
public BaseTransformData() {
status = ComponentExecutionStatus.STATUS_EMPTY;
@@ -123,4 +129,40 @@ public abstract class BaseTransformData implements ITransformData {
public boolean isDisposed() {
return status == ComponentExecutionStatus.STATUS_DISPOSED;
}
+
+ /**
+ * Gets beamContext
+ *
+ * @return value of beamContext
+ */
+ public boolean isBeamContext() {
+ return beamContext;
+ }
+
+ /**
+ * Sets beamContext
+ *
+ * @param beamContext value of beamContext
+ */
+ public void setBeamContext(boolean beamContext) {
+ this.beamContext = beamContext;
+ }
+
+ /**
+ * Gets beamBundleNr
+ *
+ * @return value of beamBundleNr
+ */
+ public int getBeamBundleNr() {
+ return beamBundleNr;
+ }
+
+ /**
+ * Sets beamBundleNr
+ *
+ * @param beamBundleNr value of beamBundleNr
+ */
+ public void setBeamBundleNr(int beamBundleNr) {
+ this.beamBundleNr = beamBundleNr;
+ }
}
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java
index 4b452a84e0..d1bb0af3ce 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java
@@ -295,14 +295,27 @@ public interface ITransform
void setRepartitioning(int partitioningMethod);
/**
- * Calling this method will alert the transform that we finished passing a batch of records to the
- * transform. Specifically for transforms like "Sort Rows" it means that the buffered rows can be
- * sorted and passed on.
+ * When using the Single threaded engine this signals to the transform that a batch of records has been processed
+ * and that no more are expected in this batch.
*
* @throws HopException In case an error occurs during the processing of the batch of rows.
*/
void batchComplete() throws HopException;
+ /**
+ * When running in a Beam context this signals the transform that a new bundle was started.
+ * File writing transforms might want to create new files here.
+ * @throws HopException
+ */
+ void startBundle() throws HopException;
+
+ /**
+ * When running in a Beam context this signals the transform that a bundle was finished.
+ * File writing transforms will want to close open file(s) here.
+ * @throws HopException
+ */
+ void finishBundle() throws HopException;
+
/**
* Pass along the metadata to use when loading external elements at runtime.
*
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java
index b7d7875e15..7ed71de9c0 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java
@@ -82,4 +82,33 @@ public interface ITransformData {
* @return true, if is disposed
*/
boolean isDisposed();
+
+
+ /**
+ * Gets beamContext
+ *
+ * @return value of beamContext
+ */
+ public boolean isBeamContext();
+
+ /**
+ * Sets beamContext
+ *
+ * @param beamContext value of beamContext
+ */
+ public void setBeamContext(boolean beamContext);
+
+ /**
+ * Gets beamBundleNr
+ *
+ * @return value of beamBundleNr
+ */
+ public int getBeamBundleNr();
+
+ /**
+ * Sets beamBundleNr
+ *
+ * @param beamBundleNr value of beamBundleNr
+ */
+ public void setBeamBundleNr(int beamBundleNr);
}
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl b/integration-tests/beam_directrunner/0005-generate-single-file-validation.hpl
similarity index 81%
copy from integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
copy to integration-tests/beam_directrunner/0005-generate-single-file-validation.hpl
index 8020f7c124..fd82aa1768 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
+++ b/integration-tests/beam_directrunner/0005-generate-single-file-validation.hpl
@@ -19,7 +19,7 @@ limitations under the License.
-->
<pipeline>
<info>
- <name>0005-generate-one-json-file-validation</name>
+ <name>0005-generate-single-file-validation</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
@@ -41,23 +41,33 @@ limitations under the License.
</notepads>
<order>
<hop>
- <from>/tmp/0005/0005-*.json</from>
+ <from>/tmp/0005/*.json</from>
<to>json</to>
<enabled>Y</enabled>
</hop>
<hop>
- <from>/tmp/0005/0005-*.csv</from>
+ <from>/tmp/0005/*.csv</from>
<to>cvs</to>
<enabled>Y</enabled>
</hop>
<hop>
- <from>/tmp/0005/0005-*.xlsx</from>
+ <from>/tmp/0005/*.xlsx</from>
<to>xlsx</to>
<enabled>Y</enabled>
</hop>
+ <hop>
+ <from>/tmp/0005/*.parquet</from>
+ <to>read Parquet file</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>read Parquet file</from>
+ <to>parquet</to>
+ <enabled>Y</enabled>
+ </hop>
</order>
<transform>
- <name>/tmp/0005/0005-*.csv</name>
+ <name>/tmp/0005/*.csv</name>
<type>TextFileInput2</type>
<description/>
<distribute>Y</distribute>
@@ -96,7 +106,7 @@ limitations under the License.
<add_to_result_filenames>Y</add_to_result_filenames>
<file>
<name>${java.io.tmpdir}/0005/</name>
- <filemask>0005-.*\.csv</filemask>
+ <filemask>.*\.csv</filemask>
<exclude_filemask/>
<file_required>N</file_required>
<include_subfolders>N</include_subfolders>
@@ -284,12 +294,12 @@ limitations under the License.
<sizeFieldName/>
<attributes/>
<GUI>
- <xloc>160</xloc>
- <yloc>144</yloc>
+ <xloc>144</xloc>
+ <yloc>128</yloc>
</GUI>
</transform>
<transform>
- <name>/tmp/0005/0005-*.json</name>
+ <name>/tmp/0005/*.json</name>
<type>JsonInput</type>
<description/>
<distribute>Y</distribute>
@@ -463,12 +473,45 @@ limitations under the License.
<sizeFieldName/>
<attributes/>
<GUI>
- <xloc>160</xloc>
- <yloc>64</yloc>
+ <xloc>144</xloc>
+ <yloc>48</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>/tmp/0005/*.parquet</name>
+ <type>GetFileNames</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <isaddresult>Y</isaddresult>
+ <doNotFailIfNoFile>N</doNotFailIfNoFile>
+ <dynamic_include_subfolders>N</dynamic_include_subfolders>
+ <filefield>N</filefield>
+ <file>
+ <filemask>.*\.parquet</filemask>
+ <name>${java.io.tmpdir}/0005/</name>
+ <file_required>N</file_required>
+ <include_subfolders>N</include_subfolders>
+ </file>
+ <filter>
+ <filterfiletype>all_files</filterfiletype>
+ </filter>
+ <rownum>N</rownum>
+ <raiseAnExceptionIfNoFile>N</raiseAnExceptionIfNoFile>
+ <limit>0</limit>
+ <attributes/>
+ <GUI>
+ <xloc>144</xloc>
+ <yloc>288</yloc>
</GUI>
</transform>
<transform>
- <name>/tmp/0005/0005-*.xlsx</name>
+ <name>/tmp/0005/*.xlsx</name>
<type>ExcelInput</type>
<description/>
<distribute>Y</distribute>
@@ -495,7 +538,7 @@ limitations under the License.
<accept_transform_name/>
<file>
<name>${java.io.tmpdir}/0005/</name>
- <filemask>0005-.*\.xlsx</filemask>
+ <filemask>.*\.xlsx</filemask>
<exclude_filemask/>
<file_required>N</file_required>
<include_subfolders>N</include_subfolders>
@@ -649,8 +692,8 @@ limitations under the License.
<spreadsheet_type>POI</spreadsheet_type>
<attributes/>
<GUI>
- <xloc>160</xloc>
- <yloc>224</yloc>
+ <xloc>144</xloc>
+ <yloc>208</yloc>
</GUI>
</transform>
<transform>
@@ -666,8 +709,8 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>352</xloc>
- <yloc>144</yloc>
+ <xloc>512</xloc>
+ <yloc>128</yloc>
</GUI>
</transform>
<transform>
@@ -683,8 +726,95 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>352</xloc>
- <yloc>64</yloc>
+ <xloc>512</xloc>
+ <yloc>48</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>parquet</name>
+ <type>Dummy</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <attributes/>
+ <GUI>
+ <xloc>512</xloc>
+ <yloc>288</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>read Parquet file</name>
+ <type>ParquetFileInput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <fields>
+ <field>
+ <source_field>id</source_field>
+ <target_field>id</target_field>
+ <target_type>Integer</target_type>
+ </field>
+ <field>
+ <source_field>lastName</source_field>
+ <target_field>lastName</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>firstName</source_field>
+ <target_field>firstName</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>zipCode</source_field>
+ <target_field>zipCode</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>city</source_field>
+ <target_field>city</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>birthdate</source_field>
+ <target_field>birthdate</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>street</source_field>
+ <target_field>street</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>housenr</source_field>
+ <target_field>housenr</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>stateCode</source_field>
+ <target_field>stateCode</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>state</source_field>
+ <target_field>state</target_field>
+ <target_type>String</target_type>
+ </field>
+ </fields>
+ <filename_field>filename</filename_field>
+ <attributes/>
+ <GUI>
+ <xloc>320</xloc>
+ <yloc>288</yloc>
</GUI>
</transform>
<transform>
@@ -700,8 +830,8 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>352</xloc>
- <yloc>224</yloc>
+ <xloc>512</xloc>
+ <yloc>208</yloc>
</GUI>
</transform>
<transform_error_handling>
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl b/integration-tests/beam_directrunner/0005-generate-single-file.hpl
similarity index 74%
copy from integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
copy to integration-tests/beam_directrunner/0005-generate-single-file.hpl
index 0dcd3e3641..d337409555 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
+++ b/integration-tests/beam_directrunner/0005-generate-single-file.hpl
@@ -19,7 +19,7 @@ limitations under the License.
-->
<pipeline>
<info>
- <name>0005-generate-one-json-file</name>
+ <name>0005-generate-single-file</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
@@ -41,33 +41,79 @@ limitations under the License.
</notepads>
<order>
<hop>
- <from>input/customers-noheader-1k.txt</from>
+ <from>parking/customers-noheader-1k.txt</from>
<to>FL and CA</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>FL and CA</from>
- <to>0005-customers-ca-fl.json</to>
+ <to>customers-ca-fl.json</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>FL and CA</from>
- <to>0005-customers-ca-fl.csv</to>
+ <to>customers-ca-fl.csv</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>FL and CA</from>
- <to>0007-customers-ca-fl.xlsx</to>
+ <to>customers-ca-fl.xlsx</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>FL and CA</from>
+ <to>customers-fl-ca.parquet</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
- <name>0005-customers-ca-fl.csv</name>
+ <name>FL and CA</name>
+ <type>FilterRows</type>
+ <description/>
+ <distribute>N</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <send_true_to/>
+ <send_false_to/>
+ <compare>
+ <condition>
+ <negated>N</negated>
+ <conditions>
+ <condition>
+ <negated>N</negated>
+ <leftvalue>stateCode</leftvalue>
+ <function>IN LIST</function>
+ <rightvalue/>
+ <value>
+ <name>constant</name>
+ <type>String</type>
+ <text>FL;CA</text>
+ <length>-1</length>
+ <precision>-1</precision>
+ <isnull>N</isnull>
+ <mask/>
+ </value>
+ </condition>
+ </conditions>
+ </condition>
+ </compare>
+ <attributes/>
+ <GUI>
+ <xloc>336</xloc>
+ <yloc>64</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>customers-ca-fl.csv</name>
<type>TextFileOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
- <copies>BEAM_SINGLE</copies>
+ <copies>SINGLE_BEAM</copies>
<partitioning>
<method>none</method>
<schema_name/>
@@ -86,9 +132,9 @@ limitations under the License.
<fileNameField/>
<create_parent_folder>Y</create_parent_folder>
<file>
- <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+ <name>${java.io.tmpdir}/0005/customers-ca-fl</name>
<servlet_output>N</servlet_output>
- <do_not_open_new_file_init>N</do_not_open_new_file_init>
+ <do_not_open_new_file_init>Y</do_not_open_new_file_init>
<extention>csv</extention>
<append>N</append>
<split>N</split>
@@ -226,17 +272,17 @@ limitations under the License.
</fields>
<attributes/>
<GUI>
- <xloc>592</xloc>
- <yloc>224</yloc>
+ <xloc>720</xloc>
+ <yloc>112</yloc>
</GUI>
</transform>
<transform>
- <name>0005-customers-ca-fl.json</name>
+ <name>customers-ca-fl.json</name>
<type>JsonOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
- <copies>BEAM_SINGLE</copies>
+ <copies>SINGLE_BEAM</copies>
<partitioning>
<method>none</method>
<schema_name/>
@@ -244,11 +290,12 @@ limitations under the License.
<addToResult>N</addToResult>
<createParentFolder>Y</createParentFolder>
<dateInFilename>N</dateInFilename>
+ <doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
<encoding>UTF-8</encoding>
<extension>json</extension>
<fileAppended>N</fileAppended>
<fileAsCommand>N</fileAsCommand>
- <fileName>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}.json</fileName>
+ <fileName>${java.io.tmpdir}/0005/customers-ca-fl</fileName>
<jsonBloc>customers</jsonBloc>
<nrRowsInBloc>5000</nrRowsInBloc>
<operation_type>writetofile</operation_type>
@@ -301,17 +348,17 @@ limitations under the License.
<transformNrInFilename>N</transformNrInFilename>
<attributes/>
<GUI>
- <xloc>592</xloc>
- <yloc>96</yloc>
+ <xloc>624</xloc>
+ <yloc>144</yloc>
</GUI>
</transform>
<transform>
- <name>0007-customers-ca-fl.xlsx</name>
+ <name>customers-ca-fl.xlsx</name>
<type>TypeExitExcelWriterTransform</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
- <copies>SINGLE_BEAM</copies>
+ <copies>BEAM_SINGLE</copies>
<partitioning>
<method>none</method>
<schema_name/>
@@ -325,9 +372,9 @@ limitations under the License.
<autosizecolums>Y</autosizecolums>
<createParentFolder>Y</createParentFolder>
<add_date>N</add_date>
- <do_not_open_newfile_init>N</do_not_open_newfile_init>
+ <do_not_open_newfile_init>Y</do_not_open_newfile_init>
<extension>xlsx</extension>
- <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+ <name>${java.io.tmpdir}/0005/customers-ca-fl</name>
<filename_in_field>N</filename_in_field>
<if_file_exists>new</if_file_exists>
<if_sheet_exists>new</if_sheet_exists>
@@ -417,48 +464,86 @@ limitations under the License.
</template>
<attributes/>
<GUI>
- <xloc>592</xloc>
- <yloc>352</yloc>
+ <xloc>816</xloc>
+ <yloc>64</yloc>
</GUI>
</transform>
<transform>
- <name>FL and CA</name>
- <type>FilterRows</type>
+ <name>customers-fl-ca.parquet</name>
+ <type>ParquetFileOutput</type>
<description/>
- <distribute>N</distribute>
+ <distribute>Y</distribute>
<custom_distribution/>
- <copies>1</copies>
+ <copies>SINGLE_BEAM</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
- <send_true_to/>
- <send_false_to/>
- <compare>
- <condition>
- <negated>N</negated>
- <leftvalue>stateCode</leftvalue>
- <function>IN LIST</function>
- <rightvalue/>
- <value>
- <name>constant</name>
- <type>String</type>
- <text>FL;CA</text>
- <length>-1</length>
- <precision>-1</precision>
- <isnull>N</isnull>
- <mask/>
- </value>
- </condition>
- </compare>
+ <compression_codec>UNCOMPRESSED</compression_codec>
+ <data_page_size>1048576</data_page_size>
+ <dictionary_page_size>1048576</dictionary_page_size>
+ <fields>
+ <field>
+ <source_field>id</source_field>
+ <target_field>id</target_field>
+ </field>
+ <field>
+ <source_field>Last name</source_field>
+ <target_field>lastName</target_field>
+ </field>
+ <field>
+ <source_field>First name</source_field>
+ <target_field>firstName</target_field>
+ </field>
+ <field>
+ <source_field>cust_zip_code</source_field>
+ <target_field>zipCode</target_field>
+ </field>
+ <field>
+ <source_field>city</source_field>
+ <target_field>city</target_field>
+ </field>
+ <field>
+ <source_field>birthdate</source_field>
+ <target_field>birthdate</target_field>
+ </field>
+ <field>
+ <source_field>street</source_field>
+ <target_field>street</target_field>
+ </field>
+ <field>
+ <source_field>housenr</source_field>
+ <target_field>housenr</target_field>
+ </field>
+ <field>
+ <source_field>stateCode</source_field>
+ <target_field>stateCode</target_field>
+ </field>
+ <field>
+ <source_field>state</source_field>
+ <target_field>state</target_field>
+ </field>
+ </fields>
+ <filename_split_size>1000000</filename_split_size>
+ <filename_base>${java.io.tmpdir}/0005/customers-fl-ca</filename_base>
+ <filename_create_parent_folders>Y</filename_create_parent_folders>
+ <filename_datetime_format>yyyyMMdd-HHmmss</filename_datetime_format>
+ <filename_ext>parquet</filename_ext>
+ <filename_include_copy>N</filename_include_copy>
+ <filename_include_date>N</filename_include_date>
+ <filename_include_datetime>N</filename_include_datetime>
+ <filename_include_split>Y</filename_include_split>
+ <filename_include_time>N</filename_include_time>
+ <row_group_size>20000</row_group_size>
+ <version>2.0</version>
<attributes/>
<GUI>
- <xloc>336</xloc>
- <yloc>224</yloc>
+ <xloc>512</xloc>
+ <yloc>176</yloc>
</GUI>
</transform>
<transform>
- <name>input/customers-noheader-1k.txt</name>
+ <name>parking/customers-noheader-1k.txt</name>
<type>BeamInput</type>
<description/>
<distribute>Y</distribute>
@@ -473,7 +558,7 @@ limitations under the License.
<attributes/>
<GUI>
<xloc>144</xloc>
- <yloc>224</yloc>
+ <yloc>64</yloc>
</GUI>
</transform>
<transform_error_handling>
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl b/integration-tests/beam_directrunner/0006-generate-bundle-files-validation.hpl
similarity index 81%
rename from integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
rename to integration-tests/beam_directrunner/0006-generate-bundle-files-validation.hpl
index 8020f7c124..d9dbed5b3d 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
+++ b/integration-tests/beam_directrunner/0006-generate-bundle-files-validation.hpl
@@ -19,7 +19,7 @@ limitations under the License.
-->
<pipeline>
<info>
- <name>0005-generate-one-json-file-validation</name>
+ <name>0006-generate-bundle-files-validation</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
@@ -41,23 +41,33 @@ limitations under the License.
</notepads>
<order>
<hop>
- <from>/tmp/0005/0005-*.json</from>
+ <from>/tmp/0006/*.json</from>
<to>json</to>
<enabled>Y</enabled>
</hop>
<hop>
- <from>/tmp/0005/0005-*.csv</from>
+ <from>/tmp/0006/*.csv</from>
<to>cvs</to>
<enabled>Y</enabled>
</hop>
<hop>
- <from>/tmp/0005/0005-*.xlsx</from>
+ <from>/tmp/0006/*.xlsx</from>
<to>xlsx</to>
<enabled>Y</enabled>
</hop>
+ <hop>
+ <from>/tmp/0006/*.parquet</from>
+ <to>read Parquet file</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>read Parquet file</from>
+ <to>parquet</to>
+ <enabled>Y</enabled>
+ </hop>
</order>
<transform>
- <name>/tmp/0005/0005-*.csv</name>
+ <name>/tmp/0006/*.csv</name>
<type>TextFileInput2</type>
<description/>
<distribute>Y</distribute>
@@ -95,8 +105,8 @@ limitations under the License.
<length>Characters</length>
<add_to_result_filenames>Y</add_to_result_filenames>
<file>
- <name>${java.io.tmpdir}/0005/</name>
- <filemask>0005-.*\.csv</filemask>
+ <name>${java.io.tmpdir}/0006/</name>
+ <filemask>.*\.csv</filemask>
<exclude_filemask/>
<file_required>N</file_required>
<include_subfolders>N</include_subfolders>
@@ -289,7 +299,7 @@ limitations under the License.
</GUI>
</transform>
<transform>
- <name>/tmp/0005/0005-*.json</name>
+ <name>/tmp/0006/*.json</name>
<type>JsonInput</type>
<description/>
<distribute>Y</distribute>
@@ -311,7 +321,7 @@ limitations under the License.
<defaultPathLeafToNull>Y</defaultPathLeafToNull>
<rownum_field/>
<file>
- <name>${java.io.tmpdir}/0005/</name>
+ <name>${java.io.tmpdir}/0006/</name>
<filemask>.*\.json</filemask>
<exclude_filemask/>
<file_required>N</file_required>
@@ -468,7 +478,7 @@ limitations under the License.
</GUI>
</transform>
<transform>
- <name>/tmp/0005/0005-*.xlsx</name>
+ <name>/tmp/0006/*.xlsx</name>
<type>ExcelInput</type>
<description/>
<distribute>Y</distribute>
@@ -494,8 +504,8 @@ limitations under the License.
<accept_field/>
<accept_transform_name/>
<file>
- <name>${java.io.tmpdir}/0005/</name>
- <filemask>0005-.*\.xlsx</filemask>
+ <name>${java.io.tmpdir}/0006/</name>
+ <filemask>.*\.xlsx</filemask>
<exclude_filemask/>
<file_required>N</file_required>
<include_subfolders>N</include_subfolders>
@@ -653,6 +663,44 @@ limitations under the License.
<yloc>224</yloc>
</GUI>
</transform>
+ <transform>
+ <name>/tmp/0006/*.parquet</name>
+ <type>GetFileNames</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <isaddresult>Y</isaddresult>
+ <doNotFailIfNoFile>N</doNotFailIfNoFile>
+ <exclude_wildcard_Field/>
+ <filename_Field/>
+ <dynamic_include_subfolders>N</dynamic_include_subfolders>
+ <wildcard_Field/>
+ <filefield>N</filefield>
+ <file>
+ <exclude_filemask/>
+ <filemask>.*\.parquet</filemask>
+ <name>${java.io.tmpdir}/0006/</name>
+ <file_required>N</file_required>
+ <include_subfolders>N</include_subfolders>
+ </file>
+ <filter>
+ <filterfiletype>all_files</filterfiletype>
+ </filter>
+ <rownum>N</rownum>
+ <raiseAnExceptionIfNoFile>N</raiseAnExceptionIfNoFile>
+ <limit>0</limit>
+ <rownum_field/>
+ <attributes/>
+ <GUI>
+ <xloc>160</xloc>
+ <yloc>304</yloc>
+ </GUI>
+ </transform>
<transform>
<name>cvs</name>
<type>Dummy</type>
@@ -666,7 +714,7 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>352</xloc>
+ <xloc>528</xloc>
<yloc>144</yloc>
</GUI>
</transform>
@@ -683,10 +731,97 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>352</xloc>
+ <xloc>528</xloc>
<yloc>64</yloc>
</GUI>
</transform>
+ <transform>
+ <name>parquet</name>
+ <type>Dummy</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <attributes/>
+ <GUI>
+ <xloc>528</xloc>
+ <yloc>304</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>read Parquet file</name>
+ <type>ParquetFileInput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <fields>
+ <field>
+ <source_field>id</source_field>
+ <target_field>id</target_field>
+ <target_type>Integer</target_type>
+ </field>
+ <field>
+ <source_field>lastName</source_field>
+ <target_field>lastName</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>firstName</source_field>
+ <target_field>firstName</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>zipCode</source_field>
+ <target_field>zipCode</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>city</source_field>
+ <target_field>city</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>birthdate</source_field>
+ <target_field>birthdate</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>street</source_field>
+ <target_field>street</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>housenr</source_field>
+ <target_field>housenr</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>stateCode</source_field>
+ <target_field>stateCode</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>state</source_field>
+ <target_field>state</target_field>
+ <target_type>String</target_type>
+ </field>
+ </fields>
+ <filename_field>filename</filename_field>
+ <attributes/>
+ <GUI>
+ <xloc>336</xloc>
+ <yloc>304</yloc>
+ </GUI>
+ </transform>
<transform>
<name>xlsx</name>
<type>Dummy</type>
@@ -700,7 +835,7 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>352</xloc>
+ <xloc>528</xloc>
<yloc>224</yloc>
</GUI>
</transform>
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl b/integration-tests/beam_directrunner/0006-generate-bundle-files.hpl
similarity index 74%
rename from integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
rename to integration-tests/beam_directrunner/0006-generate-bundle-files.hpl
index 0dcd3e3641..f2f8ef86e3 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
+++ b/integration-tests/beam_directrunner/0006-generate-bundle-files.hpl
@@ -19,7 +19,7 @@ limitations under the License.
-->
<pipeline>
<info>
- <name>0005-generate-one-json-file</name>
+ <name>0006-generate-bundle-files</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
@@ -41,33 +41,79 @@ limitations under the License.
</notepads>
<order>
<hop>
- <from>input/customers-noheader-1k.txt</from>
+ <from>parking/customers-noheader-1k.txt</from>
<to>FL and CA</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>FL and CA</from>
- <to>0005-customers-ca-fl.json</to>
+ <to>customers-ca-fl.json</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>FL and CA</from>
- <to>0005-customers-ca-fl.csv</to>
+ <to>customers-ca-fl.csv</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>FL and CA</from>
- <to>0007-customers-ca-fl.xlsx</to>
+ <to>customers-ca-fl.xlsx</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>FL and CA</from>
+ <to>customers-fl-ca.parquet</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
- <name>0005-customers-ca-fl.csv</name>
+ <name>FL and CA</name>
+ <type>FilterRows</type>
+ <description/>
+ <distribute>N</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <send_true_to/>
+ <send_false_to/>
+ <compare>
+ <condition>
+ <negated>N</negated>
+ <conditions>
+ <condition>
+ <negated>N</negated>
+ <leftvalue>stateCode</leftvalue>
+ <function>IN LIST</function>
+ <rightvalue/>
+ <value>
+ <name>constant</name>
+ <type>String</type>
+ <text>FL;CA</text>
+ <length>-1</length>
+ <precision>-1</precision>
+ <isnull>N</isnull>
+ <mask/>
+ </value>
+ </condition>
+ </conditions>
+ </condition>
+ </compare>
+ <attributes/>
+ <GUI>
+ <xloc>336</xloc>
+ <yloc>64</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>customers-ca-fl.csv</name>
<type>TextFileOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
- <copies>BEAM_SINGLE</copies>
+ <copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
@@ -86,9 +132,9 @@ limitations under the License.
<fileNameField/>
<create_parent_folder>Y</create_parent_folder>
<file>
- <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+ <name>${java.io.tmpdir}/0006/customers-ca-fl</name>
<servlet_output>N</servlet_output>
- <do_not_open_new_file_init>N</do_not_open_new_file_init>
+ <do_not_open_new_file_init>Y</do_not_open_new_file_init>
<extention>csv</extention>
<append>N</append>
<split>N</split>
@@ -226,17 +272,17 @@ limitations under the License.
</fields>
<attributes/>
<GUI>
- <xloc>592</xloc>
- <yloc>224</yloc>
+ <xloc>752</xloc>
+ <yloc>96</yloc>
</GUI>
</transform>
<transform>
- <name>0005-customers-ca-fl.json</name>
+ <name>customers-ca-fl.json</name>
<type>JsonOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
- <copies>BEAM_SINGLE</copies>
+ <copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
@@ -244,11 +290,12 @@ limitations under the License.
<addToResult>N</addToResult>
<createParentFolder>Y</createParentFolder>
<dateInFilename>N</dateInFilename>
+ <doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
<encoding>UTF-8</encoding>
<extension>json</extension>
<fileAppended>N</fileAppended>
<fileAsCommand>N</fileAsCommand>
- <fileName>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}.json</fileName>
+ <fileName>${java.io.tmpdir}/0006/customers-ca-fl</fileName>
<jsonBloc>customers</jsonBloc>
<nrRowsInBloc>5000</nrRowsInBloc>
<operation_type>writetofile</operation_type>
@@ -301,17 +348,17 @@ limitations under the License.
<transformNrInFilename>N</transformNrInFilename>
<attributes/>
<GUI>
- <xloc>592</xloc>
- <yloc>96</yloc>
+ <xloc>624</xloc>
+ <yloc>128</yloc>
</GUI>
</transform>
<transform>
- <name>0007-customers-ca-fl.xlsx</name>
+ <name>customers-ca-fl.xlsx</name>
<type>TypeExitExcelWriterTransform</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
- <copies>SINGLE_BEAM</copies>
+ <copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
@@ -325,9 +372,9 @@ limitations under the License.
<autosizecolums>Y</autosizecolums>
<createParentFolder>Y</createParentFolder>
<add_date>N</add_date>
- <do_not_open_newfile_init>N</do_not_open_newfile_init>
+ <do_not_open_newfile_init>Y</do_not_open_newfile_init>
<extension>xlsx</extension>
- <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+ <name>${java.io.tmpdir}/0006/customers-ca-fl</name>
<filename_in_field>N</filename_in_field>
<if_file_exists>new</if_file_exists>
<if_sheet_exists>new</if_sheet_exists>
@@ -417,48 +464,86 @@ limitations under the License.
</template>
<attributes/>
<GUI>
- <xloc>592</xloc>
- <yloc>352</yloc>
+ <xloc>864</xloc>
+ <yloc>64</yloc>
</GUI>
</transform>
<transform>
- <name>FL and CA</name>
- <type>FilterRows</type>
+ <name>customers-fl-ca.parquet</name>
+ <type>ParquetFileOutput</type>
<description/>
- <distribute>N</distribute>
+ <distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
- <send_true_to/>
- <send_false_to/>
- <compare>
- <condition>
- <negated>N</negated>
- <leftvalue>stateCode</leftvalue>
- <function>IN LIST</function>
- <rightvalue/>
- <value>
- <name>constant</name>
- <type>String</type>
- <text>FL;CA</text>
- <length>-1</length>
- <precision>-1</precision>
- <isnull>N</isnull>
- <mask/>
- </value>
- </condition>
- </compare>
+ <compression_codec>UNCOMPRESSED</compression_codec>
+ <data_page_size>1048576</data_page_size>
+ <dictionary_page_size>1048576</dictionary_page_size>
+ <fields>
+ <field>
+ <source_field>id</source_field>
+ <target_field>id</target_field>
+ </field>
+ <field>
+ <source_field>Last name</source_field>
+ <target_field>lastName</target_field>
+ </field>
+ <field>
+ <source_field>First name</source_field>
+ <target_field>firstName</target_field>
+ </field>
+ <field>
+ <source_field>cust_zip_code</source_field>
+ <target_field>zipCode</target_field>
+ </field>
+ <field>
+ <source_field>city</source_field>
+ <target_field>city</target_field>
+ </field>
+ <field>
+ <source_field>birthdate</source_field>
+ <target_field>birthdate</target_field>
+ </field>
+ <field>
+ <source_field>street</source_field>
+ <target_field>street</target_field>
+ </field>
+ <field>
+ <source_field>housenr</source_field>
+ <target_field>housenr</target_field>
+ </field>
+ <field>
+ <source_field>stateCode</source_field>
+ <target_field>stateCode</target_field>
+ </field>
+ <field>
+ <source_field>state</source_field>
+ <target_field>state</target_field>
+ </field>
+ </fields>
+ <filename_split_size>1000000</filename_split_size>
+ <filename_base>${java.io.tmpdir}/0006/customers-fl-ca</filename_base>
+ <filename_create_parent_folders>Y</filename_create_parent_folders>
+ <filename_datetime_format>yyyyMMdd-HHmmss</filename_datetime_format>
+ <filename_ext>parquet</filename_ext>
+ <filename_include_copy>N</filename_include_copy>
+ <filename_include_date>N</filename_include_date>
+ <filename_include_datetime>N</filename_include_datetime>
+ <filename_include_split>Y</filename_include_split>
+ <filename_include_time>N</filename_include_time>
+ <row_group_size>20000</row_group_size>
+ <version>2.0</version>
<attributes/>
<GUI>
- <xloc>336</xloc>
- <yloc>224</yloc>
+ <xloc>512</xloc>
+ <yloc>160</yloc>
</GUI>
</transform>
<transform>
- <name>input/customers-noheader-1k.txt</name>
+ <name>parking/customers-noheader-1k.txt</name>
<type>BeamInput</type>
<description/>
<distribute>Y</distribute>
@@ -473,7 +558,7 @@ limitations under the License.
<attributes/>
<GUI>
<xloc>144</xloc>
- <yloc>224</yloc>
+ <yloc>64</yloc>
</GUI>
</transform>
<transform_error_handling>
diff --git a/integration-tests/beam_directrunner/main-0005-single-thread.hwf b/integration-tests/beam_directrunner/main-0005-single-thread.hwf
index 4e16a2336d..db94e1a5f9 100644
--- a/integration-tests/beam_directrunner/main-0005-single-thread.hwf
+++ b/integration-tests/beam_directrunner/main-0005-single-thread.hwf
@@ -44,7 +44,7 @@ limitations under the License.
<weekDay>1</weekDay>
<DayOfMonth>1</DayOfMonth>
<parallel>N</parallel>
- <xloc>144</xloc>
+ <xloc>112</xloc>
<yloc>96</yloc>
<attributes_hac/>
</action>
@@ -73,7 +73,7 @@ limitations under the License.
<attributes/>
<test_names>
<test_name>
- <name>0005-generate-one-json-file-validation UNIT</name>
+ <name>0005-generate-single-file-validation UNIT</name>
</test_name>
</test_names>
<parallel>N</parallel>
@@ -82,11 +82,11 @@ limitations under the License.
<attributes_hac/>
</action>
<action>
- <name>0005-generate-one-json-file</name>
+ <name>0005-generate-single-file.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
- <filename>${PROJECT_HOME}/0005-generate-one-json-file.hpl</filename>
+ <filename>${PROJECT_HOME}/0005-generate-single-file.hpl</filename>
<params_from_previous>N</params_from_previous>
<exec_per_row>N</exec_per_row>
<clear_rows>N</clear_rows>
@@ -120,13 +120,13 @@ limitations under the License.
</hop>
<hop>
<from>delete /tmp/0005/*</from>
- <to>0005-generate-one-json-file</to>
+ <to>0005-generate-single-file.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
<hop>
- <from>0005-generate-one-json-file</from>
+ <from>0005-generate-single-file.hpl</from>
<to>Run Pipeline Unit Tests</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
diff --git a/integration-tests/beam_directrunner/main-0005-single-thread.hwf b/integration-tests/beam_directrunner/main-0006-generate-bundle-files.hwf
similarity index 88%
copy from integration-tests/beam_directrunner/main-0005-single-thread.hwf
copy to integration-tests/beam_directrunner/main-0006-generate-bundle-files.hwf
index 4e16a2336d..d69f4fa3b6 100644
--- a/integration-tests/beam_directrunner/main-0005-single-thread.hwf
+++ b/integration-tests/beam_directrunner/main-0006-generate-bundle-files.hwf
@@ -18,7 +18,7 @@ limitations under the License.
-->
<workflow>
- <name>main-0005-single-thread</name>
+ <name>main-0006-generate-bundle-files</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
@@ -49,7 +49,7 @@ limitations under the License.
<attributes_hac/>
</action>
<action>
- <name>delete /tmp/0005/*</name>
+ <name>delete /tmp/0006/*</name>
<description/>
<type>DELETE_FILES</type>
<attributes/>
@@ -57,7 +57,7 @@ limitations under the License.
<include_subfolders>N</include_subfolders>
<fields>
<field>
- <name>${java.io.tmpdir}/0005/</name>
+ <name>${java.io.tmpdir}/0006/</name>
<filemask>.*</filemask>
</field>
</fields>
@@ -73,7 +73,7 @@ limitations under the License.
<attributes/>
<test_names>
<test_name>
- <name>0005-generate-one-json-file-validation UNIT</name>
+ <name>0006-generate-bundle-files-validation UNIT</name>
</test_name>
</test_names>
<parallel>N</parallel>
@@ -82,11 +82,11 @@ limitations under the License.
<attributes_hac/>
</action>
<action>
- <name>0005-generate-one-json-file</name>
+ <name>0006-generate-bundle-files.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
- <filename>${PROJECT_HOME}/0005-generate-one-json-file.hpl</filename>
+ <filename>${PROJECT_HOME}/0006-generate-bundle-files.hpl</filename>
<params_from_previous>N</params_from_previous>
<exec_per_row>N</exec_per_row>
<clear_rows>N</clear_rows>
@@ -113,20 +113,20 @@ limitations under the License.
<hops>
<hop>
<from>Start</from>
- <to>delete /tmp/0005/*</to>
+ <to>delete /tmp/0006/*</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
- <from>delete /tmp/0005/*</from>
- <to>0005-generate-one-json-file</to>
+ <from>delete /tmp/0006/*</from>
+ <to>0006-generate-bundle-files.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
<hop>
- <from>0005-generate-one-json-file</from>
+ <from>0006-generate-bundle-files.hpl</from>
<to>Run Pipeline Unit Tests</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
diff --git a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json b/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-single-file-validation UNIT.json
similarity index 74%
copy from integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json
copy to integration-tests/beam_directrunner/metadata/unit-test/0005-generate-single-file-validation UNIT.json
index e807d41644..b53bbb4463 100644
--- a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json
+++ b/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-single-file-validation UNIT.json
@@ -60,11 +60,11 @@
"data_set_field": "id"
},
{
- "transform_field": "Last name",
+ "transform_field": "Last_name",
"data_set_field": "Last name"
},
{
- "transform_field": "First name",
+ "transform_field": "First_name",
"data_set_field": "First name"
},
{
@@ -99,7 +99,7 @@
"field_order": [
"id"
],
- "transform_name": "xlsx",
+ "transform_name": "cvs",
"data_set_name": "0005-generate-one-json-file-golden"
},
{
@@ -109,11 +109,11 @@
"data_set_field": "id"
},
{
- "transform_field": "Last_name",
+ "transform_field": "Last name",
"data_set_field": "Last name"
},
{
- "transform_field": "First_name",
+ "transform_field": "First name",
"data_set_field": "First name"
},
{
@@ -148,15 +148,64 @@
"field_order": [
"id"
],
- "transform_name": "cvs",
+ "transform_name": "xlsx",
+ "data_set_name": "0005-generate-one-json-file-golden"
+ },
+ {
+ "field_mappings": [
+ {
+ "transform_field": "lastName",
+ "data_set_field": "Last name"
+ },
+ {
+ "transform_field": "firstName",
+ "data_set_field": "First name"
+ },
+ {
+ "transform_field": "birthdate",
+ "data_set_field": "birthdate"
+ },
+ {
+ "transform_field": "zipCode",
+ "data_set_field": "cust_zip_code"
+ },
+ {
+ "transform_field": "city",
+ "data_set_field": "city"
+ },
+ {
+ "transform_field": "id",
+ "data_set_field": "id"
+ },
+ {
+ "transform_field": "street",
+ "data_set_field": "street"
+ },
+ {
+ "transform_field": "housenr",
+ "data_set_field": "housenr"
+ },
+ {
+ "transform_field": "stateCode",
+ "data_set_field": "stateCode"
+ },
+ {
+ "transform_field": "state",
+ "data_set_field": "state"
+ }
+ ],
+ "field_order": [
+ "id"
+ ],
+ "transform_name": "parquet",
"data_set_name": "0005-generate-one-json-file-golden"
}
],
"input_data_sets": [],
- "name": "0005-generate-one-json-file-validation UNIT",
+ "name": "0005-generate-single-file-validation UNIT",
"description": "",
"trans_test_tweaks": [],
"persist_filename": "",
- "pipeline_filename": "./0005-generate-one-json-file-validation.hpl",
+ "pipeline_filename": "./0005-generate-single-file-validation.hpl",
"test_type": "UNIT_TEST"
}
\ No newline at end of file
diff --git a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json b/integration-tests/beam_directrunner/metadata/unit-test/0006-generate-bundle-files-validation UNIT.json
similarity index 74%
rename from integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json
rename to integration-tests/beam_directrunner/metadata/unit-test/0006-generate-bundle-files-validation UNIT.json
index e807d41644..e680a896eb 100644
--- a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json
+++ b/integration-tests/beam_directrunner/metadata/unit-test/0006-generate-bundle-files-validation UNIT.json
@@ -60,11 +60,11 @@
"data_set_field": "id"
},
{
- "transform_field": "Last name",
+ "transform_field": "Last_name",
"data_set_field": "Last name"
},
{
- "transform_field": "First name",
+ "transform_field": "First_name",
"data_set_field": "First name"
},
{
@@ -99,7 +99,7 @@
"field_order": [
"id"
],
- "transform_name": "xlsx",
+ "transform_name": "cvs",
"data_set_name": "0005-generate-one-json-file-golden"
},
{
@@ -109,11 +109,11 @@
"data_set_field": "id"
},
{
- "transform_field": "Last_name",
+ "transform_field": "Last name",
"data_set_field": "Last name"
},
{
- "transform_field": "First_name",
+ "transform_field": "First name",
"data_set_field": "First name"
},
{
@@ -148,15 +148,64 @@
"field_order": [
"id"
],
- "transform_name": "cvs",
+ "transform_name": "xlsx",
+ "data_set_name": "0005-generate-one-json-file-golden"
+ },
+ {
+ "field_mappings": [
+ {
+ "transform_field": "id",
+ "data_set_field": "id"
+ },
+ {
+ "transform_field": "firstName",
+ "data_set_field": "First name"
+ },
+ {
+ "transform_field": "lastName",
+ "data_set_field": "Last name"
+ },
+ {
+ "transform_field": "zipCode",
+ "data_set_field": "cust_zip_code"
+ },
+ {
+ "transform_field": "city",
+ "data_set_field": "city"
+ },
+ {
+ "transform_field": "birthdate",
+ "data_set_field": "birthdate"
+ },
+ {
+ "transform_field": "street",
+ "data_set_field": "street"
+ },
+ {
+ "transform_field": "housenr",
+ "data_set_field": "housenr"
+ },
+ {
+ "transform_field": "stateCode",
+ "data_set_field": "stateCode"
+ },
+ {
+ "transform_field": "state",
+ "data_set_field": "state"
+ }
+ ],
+ "field_order": [
+ "id"
+ ],
+ "transform_name": "parquet",
"data_set_name": "0005-generate-one-json-file-golden"
}
],
"input_data_sets": [],
- "name": "0005-generate-one-json-file-validation UNIT",
+ "name": "0006-generate-bundle-files-validation UNIT",
"description": "",
"trans_test_tweaks": [],
"persist_filename": "",
- "pipeline_filename": "./0005-generate-one-json-file-validation.hpl",
+ "pipeline_filename": "./0006-generate-bundle-files-validation.hpl",
"test_type": "UNIT_TEST"
}
\ No newline at end of file
diff --git a/integration-tests/gcp/0007-single-thread-validation.hpl b/integration-tests/gcp/0007-single-thread-validation.hpl
index 7c3c86dab6..aa6b4ec2eb 100644
--- a/integration-tests/gcp/0007-single-thread-validation.hpl
+++ b/integration-tests/gcp/0007-single-thread-validation.hpl
@@ -55,6 +55,16 @@ limitations under the License.
<to>xlsx</to>
<enabled>Y</enabled>
</hop>
+ <hop>
+ <from>gs://apache-hop-it/output/0007-*.parquet</from>
+ <to>read Parquet file</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>read Parquet file</from>
+ <to>parquet</to>
+ <enabled>Y</enabled>
+ </hop>
</order>
<transform>
<name>csv</name>
@@ -69,8 +79,8 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>384</xloc>
- <yloc>192</yloc>
+ <xloc>544</xloc>
+ <yloc>112</yloc>
</GUI>
</transform>
<transform>
@@ -125,8 +135,8 @@ limitations under the License.
<fields>
<field>
<name>id</name>
- <type>Integer</type>
- <format>0</format>
+ <type>String</type>
+ <format/>
<currency/>
<decimal/>
<group/>
@@ -135,7 +145,7 @@ limitations under the License.
<position>-1</position>
<length>15</length>
<precision>0</precision>
- <trim_type>none</trim_type>
+ <trim_type>both</trim_type>
<repeat>N</repeat>
</field>
<field>
@@ -301,8 +311,8 @@ limitations under the License.
<sizeFieldName/>
<attributes/>
<GUI>
- <xloc>192</xloc>
- <yloc>192</yloc>
+ <xloc>176</xloc>
+ <yloc>112</yloc>
</GUI>
</transform>
<transform>
@@ -480,8 +490,41 @@ limitations under the License.
<sizeFieldName/>
<attributes/>
<GUI>
- <xloc>192</xloc>
- <yloc>96</yloc>
+ <xloc>176</xloc>
+ <yloc>32</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>gs://apache-hop-it/output/0007-*.parquet</name>
+ <type>GetFileNames</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <isaddresult>Y</isaddresult>
+ <doNotFailIfNoFile>N</doNotFailIfNoFile>
+ <dynamic_include_subfolders>N</dynamic_include_subfolders>
+ <filefield>N</filefield>
+ <file>
+ <filemask>0007-.*\.parquet</filemask>
+ <name>gs://apache-hop-it/output/</name>
+ <file_required>N</file_required>
+ <include_subfolders>N</include_subfolders>
+ </file>
+ <filter>
+ <filterfiletype>all_files</filterfiletype>
+ </filter>
+ <rownum>N</rownum>
+ <raiseAnExceptionIfNoFile>N</raiseAnExceptionIfNoFile>
+ <limit>0</limit>
+ <attributes/>
+ <GUI>
+ <xloc>176</xloc>
+ <yloc>272</yloc>
</GUI>
</transform>
<transform>
@@ -666,8 +709,8 @@ limitations under the License.
<spreadsheet_type>SAX_POI</spreadsheet_type>
<attributes/>
<GUI>
- <xloc>192</xloc>
- <yloc>288</yloc>
+ <xloc>176</xloc>
+ <yloc>192</yloc>
</GUI>
</transform>
<transform>
@@ -682,9 +725,96 @@ limitations under the License.
<schema_name/>
</partitioning>
<attributes/>
+ <GUI>
+ <xloc>544</xloc>
+ <yloc>32</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>parquet</name>
+ <type>Dummy</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <attributes/>
+ <GUI>
+ <xloc>544</xloc>
+ <yloc>272</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>read Parquet file</name>
+ <type>ParquetFileInput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <fields>
+ <field>
+ <source_field>id</source_field>
+ <target_field>id</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>lastName</source_field>
+ <target_field>lastName</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>firstName</source_field>
+ <target_field>firstName</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>zipCode</source_field>
+ <target_field>zipCode</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>city</source_field>
+ <target_field>city</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>birthdate</source_field>
+ <target_field>birthdate</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>street</source_field>
+ <target_field>street</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>housenr</source_field>
+ <target_field>housenr</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>stateCode</source_field>
+ <target_field>stateCode</target_field>
+ <target_type>String</target_type>
+ </field>
+ <field>
+ <source_field>state</source_field>
+ <target_field>state</target_field>
+ <target_type>String</target_type>
+ </field>
+ </fields>
+ <filename_field>filename</filename_field>
+ <attributes/>
<GUI>
<xloc>384</xloc>
- <yloc>96</yloc>
+ <yloc>272</yloc>
</GUI>
</transform>
<transform>
@@ -700,8 +830,8 @@ limitations under the License.
</partitioning>
<attributes/>
<GUI>
- <xloc>384</xloc>
- <yloc>288</yloc>
+ <xloc>544</xloc>
+ <yloc>192</yloc>
</GUI>
</transform>
<transform_error_handling>
diff --git a/integration-tests/gcp/0007-single-thread.hpl b/integration-tests/gcp/0007-single-thread.hpl
index 9a470d760d..89e9112dfe 100644
--- a/integration-tests/gcp/0007-single-thread.hpl
+++ b/integration-tests/gcp/0007-single-thread.hpl
@@ -38,6 +38,28 @@ limitations under the License.
<is_key_private>N</is_key_private>
</info>
<notepads>
+ <notepad>
+ <note>There is a google storage bug somewhere.
+Reading back a newly created xlsx file fails for some reason.
+</note>
+ <xloc>112</xloc>
+ <yloc>352</yloc>
+ <width>359</width>
+ <heigth>64</heigth>
+ <fontname>Noto Sans</fontname>
+ <fontsize>12</fontsize>
+ <fontbold>N</fontbold>
+ <fontitalic>N</fontitalic>
+ <fontcolorred>200</fontcolorred>
+ <fontcolorgreen>231</fontcolorgreen>
+ <fontcolorblue>250</fontcolorblue>
+ <backgroundcolorred>15</backgroundcolorred>
+ <backgroundcolorgreen>136</backgroundcolorgreen>
+ <backgroundcolorblue>210</backgroundcolorblue>
+ <bordercolorred>200</bordercolorred>
+ <bordercolorgreen>231</bordercolorgreen>
+ <bordercolorblue>250</bordercolorblue>
+ </notepad>
</notepads>
<order>
<hop>
@@ -58,6 +80,11 @@ limitations under the License.
<hop>
<from>FL,CA</from>
<to>0007-customers-ca-fl.xlsx</to>
+ <enabled>N</enabled>
+ </hop>
+ <hop>
+ <from>FL,CA</from>
+ <to>0007-customers-fl-ca.parquet</to>
<enabled>Y</enabled>
</hop>
</order>
@@ -86,9 +113,9 @@ limitations under the License.
<fileNameField/>
<create_parent_folder>Y</create_parent_folder>
<file>
- <name>gs://apache-hop-it/output/0007-customers-ca-fl-${Internal.Transform.ID}</name>
+ <name>gs://apache-hop-it/output/0007-customers-ca-fl</name>
<servlet_output>N</servlet_output>
- <do_not_open_new_file_init>N</do_not_open_new_file_init>
+ <do_not_open_new_file_init>Y</do_not_open_new_file_init>
<extention>csv</extention>
<append>N</append>
<split>N</split>
@@ -111,7 +138,7 @@ limitations under the License.
<decimal>.</decimal>
<group>,</group>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -123,7 +150,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -135,7 +162,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -147,7 +174,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -159,7 +186,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -171,7 +198,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -183,7 +210,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -195,7 +222,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -207,7 +234,7 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
@@ -219,15 +246,15 @@ limitations under the License.
<decimal/>
<group/>
<nullif/>
- <trim_type>both</trim_type>
+ <trim_type>none</trim_type>
<length>-1</length>
<precision>-1</precision>
</field>
</fields>
<attributes/>
<GUI>
- <xloc>576</xloc>
- <yloc>240</yloc>
+ <xloc>720</xloc>
+ <yloc>160</yloc>
</GUI>
</transform>
<transform>
@@ -244,11 +271,12 @@ limitations under the License.
<addToResult>N</addToResult>
<createParentFolder>N</createParentFolder>
<dateInFilename>N</dateInFilename>
+ <doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
<encoding>UTF-8</encoding>
<extension>json</extension>
<fileAppended>N</fileAppended>
<fileAsCommand>N</fileAsCommand>
- <fileName>gs://apache-hop-it/output/0007-customers-ca-fl-${Internal.Transform.ID}.json</fileName>
+ <fileName>gs://apache-hop-it/output/0007-customers-ca-fl</fileName>
<jsonBloc>customers</jsonBloc>
<nrRowsInBloc>50000</nrRowsInBloc>
<operation_type>writetofile</operation_type>
@@ -301,8 +329,8 @@ limitations under the License.
<transformNrInFilename>N</transformNrInFilename>
<attributes/>
<GUI>
- <xloc>576</xloc>
- <yloc>112</yloc>
+ <xloc>816</xloc>
+ <yloc>96</yloc>
</GUI>
</transform>
<transform>
@@ -325,16 +353,12 @@ limitations under the License.
<autosizecolums>Y</autosizecolums>
<createParentFolder>N</createParentFolder>
<add_date>N</add_date>
- <date_time_format/>
- <do_not_open_newfile_init>N</do_not_open_newfile_init>
+ <do_not_open_newfile_init>Y</do_not_open_newfile_init>
<extension>xlsx</extension>
- <name>gs://apache-hop-it/output/0007-customers-ca-fl-${Internal.Transform.ID}</name>
- <filename_field/>
+ <name>gs://apache-hop-it/output/0007-customers-ca-fl</name>
<filename_in_field>N</filename_in_field>
<if_file_exists>new</if_file_exists>
<if_sheet_exists>new</if_sheet_exists>
- <password/>
- <protected_by/>
<protect_sheet>N</protect_sheet>
<sheetname>CA-FL-Customers</sheetname>
<SpecifyFormat>N</SpecifyFormat>
@@ -350,123 +374,64 @@ limitations under the License.
<makeSheetActive>Y</makeSheetActive>
<fields>
<field>
- <commentAuthorField/>
- <commentField/>
<format>0</format>
<formula>N</formula>
- <hyperlinkField/>
<name>id</name>
- <styleCell/>
<title>id</title>
- <titleStyleCell/>
<type>Integer</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>lastName</name>
- <styleCell/>
<title>lastName</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>firstName</name>
- <styleCell/>
<title>firstName</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>zipCode</name>
- <styleCell/>
<title>zipCode</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>city</name>
- <styleCell/>
<title>city</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>birthdate</name>
- <styleCell/>
<title>birthdate</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>street</name>
- <styleCell/>
<title>street</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>housenr</name>
- <styleCell/>
<title>housenr</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>stateCode</name>
- <styleCell/>
<title>stateCode</title>
- <titleStyleCell/>
<type>String</type>
</field>
<field>
- <commentAuthorField/>
- <commentField/>
- <format/>
<formula>N</formula>
- <hyperlinkField/>
<name>state</name>
- <styleCell/>
<title>state</title>
- <titleStyleCell/>
<type>String</type>
</field>
</fields>
@@ -477,12 +442,85 @@ limitations under the License.
<filename>template.xls</filename>
<sheet_enabled>N</sheet_enabled>
<hidden>N</hidden>
- <sheetname/>
</template>
<attributes/>
<GUI>
- <xloc>576</xloc>
- <yloc>368</yloc>
+ <xloc>608</xloc>
+ <yloc>208</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>0007-customers-fl-ca.parquet</name>
+ <type>ParquetFileOutput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>SINGLE_BEAM</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <compression_codec>UNCOMPRESSED</compression_codec>
+ <data_page_size>1048576</data_page_size>
+ <dictionary_page_size>1048576</dictionary_page_size>
+ <fields>
+ <field>
+ <source_field>id</source_field>
+ <target_field>id</target_field>
+ </field>
+ <field>
+ <source_field>lastName</source_field>
+ <target_field>lastName</target_field>
+ </field>
+ <field>
+ <source_field>firstName</source_field>
+ <target_field>firstName</target_field>
+ </field>
+ <field>
+ <source_field>zipCode</source_field>
+ <target_field>zipCode</target_field>
+ </field>
+ <field>
+ <source_field>city</source_field>
+ <target_field>city</target_field>
+ </field>
+ <field>
+ <source_field>birthdate</source_field>
+ <target_field>birthdate</target_field>
+ </field>
+ <field>
+ <source_field>street</source_field>
+ <target_field>street</target_field>
+ </field>
+ <field>
+ <source_field>housenr</source_field>
+ <target_field>housenr</target_field>
+ </field>
+ <field>
+ <source_field>stateCode</source_field>
+ <target_field>stateCode</target_field>
+ </field>
+ <field>
+ <source_field>state</source_field>
+ <target_field>state</target_field>
+ </field>
+ </fields>
+ <filename_split_size>1000000</filename_split_size>
+ <filename_base>gs://apache-hop-it/output/0007-customers-fl-ca</filename_base>
+ <filename_create_parent_folders>Y</filename_create_parent_folders>
+ <filename_datetime_format>yyyyMMdd-HHmmss</filename_datetime_format>
+ <filename_ext>parquet</filename_ext>
+ <filename_include_copy>N</filename_include_copy>
+ <filename_include_date>N</filename_include_date>
+ <filename_include_datetime>N</filename_include_datetime>
+ <filename_include_split>Y</filename_include_split>
+ <filename_include_time>N</filename_include_time>
+ <row_group_size>20000</row_group_size>
+ <version>2.0</version>
+ <attributes/>
+ <GUI>
+ <xloc>496</xloc>
+ <yloc>256</yloc>
</GUI>
</transform>
<transform>
@@ -518,7 +556,7 @@ limitations under the License.
<attributes/>
<GUI>
<xloc>304</xloc>
- <yloc>240</yloc>
+ <yloc>96</yloc>
</GUI>
</transform>
<transform>
@@ -537,7 +575,7 @@ limitations under the License.
<attributes/>
<GUI>
<xloc>128</xloc>
- <yloc>240</yloc>
+ <yloc>96</yloc>
</GUI>
</transform>
<transform_error_handling>
diff --git a/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json b/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json
index fb745be473..111fc9e60c 100644
--- a/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json
+++ b/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json
@@ -101,6 +101,55 @@
],
"transform_name": "csv",
"data_set_name": "0007-single-thread-golden"
+ },
+ {
+ "field_mappings": [
+ {
+ "transform_field": "lastName",
+ "data_set_field": "lastName"
+ },
+ {
+ "transform_field": "id",
+ "data_set_field": "id"
+ },
+ {
+ "transform_field": "firstName",
+ "data_set_field": "firstName"
+ },
+ {
+ "transform_field": "zipCode",
+ "data_set_field": "zipCode"
+ },
+ {
+ "transform_field": "city",
+ "data_set_field": "city"
+ },
+ {
+ "transform_field": "birthdate",
+ "data_set_field": "birthdate"
+ },
+ {
+ "transform_field": "street",
+ "data_set_field": "street"
+ },
+ {
+ "transform_field": "housenr",
+ "data_set_field": "housenr"
+ },
+ {
+ "transform_field": "stateCode",
+ "data_set_field": "stateCode"
+ },
+ {
+ "transform_field": "state",
+ "data_set_field": "state"
+ }
+ ],
+ "field_order": [
+ "id"
+ ],
+ "transform_name": "parquet",
+ "data_set_name": "0007-single-thread-golden"
}
],
"input_data_sets": [],
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java
index dac3e368c6..1cfcecc35c 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java
@@ -21,6 +21,8 @@ package org.apache.hop.beam.core.transform;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.core.Const;
@@ -40,6 +42,7 @@ import org.apache.hop.pipeline.config.PipelineRunConfiguration;
import org.apache.hop.pipeline.transform.ITransform;
import org.apache.hop.pipeline.transform.RowAdapter;
import org.apache.hop.pipeline.transform.stream.IStream;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +62,7 @@ public abstract class TransformBaseFn extends DoFn<HopRow, HopRow> {
protected transient List<IExecutionDataSampler> dataSamplers;
protected transient List<IExecutionDataSamplerStore> dataSamplerStores;
protected transient Timer executionInfoTimer;
+ protected transient BoundedWindow batchWindow;
public TransformBaseFn(String parentLogChannelId, String runConfigName, String dataSamplersJson) {
this.parentLogChannelId = parentLogChannelId;
@@ -187,4 +191,39 @@ public abstract class TransformBaseFn extends DoFn<HopRow, HopRow> {
Const.toLong(executionInfoLocation.getDataLoggingInterval(), 10000L));
}
}
+
+ protected interface TupleOutputContext<T> {
+ void output(TupleTag<T> tupleTag, T output);
+ }
+
+ protected class TransformProcessContext implements TupleOutputContext<HopRow> {
+
+ private DoFn.ProcessContext context;
+
+ public TransformProcessContext(DoFn.ProcessContext processContext) {
+ this.context = processContext;
+ }
+
+ @Override
+ public void output(TupleTag<HopRow> tupleTag, HopRow output) {
+ context.output(tupleTag, output);
+ }
+ }
+
+ protected class TransformFinishBundleContext implements TupleOutputContext<HopRow> {
+
+ private DoFn.FinishBundleContext context;
+ private BoundedWindow batchWindow;
+
+ public TransformFinishBundleContext(
+ DoFn.FinishBundleContext context, BoundedWindow batchWindow) {
+ this.context = context;
+ this.batchWindow = batchWindow;
+ }
+
+ @Override
+ public void output(TupleTag<HopRow> tupleTag, HopRow output) {
+ context.output(tupleTag, output, Instant.now(), batchWindow);
+ }
+ }
}
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java
index b01f7b44ce..08a7bada95 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java
@@ -19,7 +19,6 @@ package org.apache.hop.beam.core.transform;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.*;
@@ -52,7 +51,6 @@ import org.apache.hop.pipeline.transform.*;
import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
import org.apache.hop.pipeline.transforms.injector.InjectorField;
import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
-import org.joda.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -232,7 +230,6 @@ public class TransformBatchTransform extends TransformTransform {
private transient SingleThreadedPipelineExecutor executor;
private transient Queue<HopRow> rowBuffer;
- private transient BoundedWindow batchWindow;
private transient AtomicLong lastTimerCheck;
private transient Timer timer;
@@ -838,38 +835,6 @@ public class TransformBatchTransform extends TransformTransform {
}
}
- private interface TupleOutputContext<T> {
- void output(TupleTag<T> tupleTag, T output);
- }
-
- private class TransformProcessContext implements TupleOutputContext<HopRow> {
- private DoFn.ProcessContext context;
-
- public TransformProcessContext(DoFn.ProcessContext processContext) {
- this.context = processContext;
- }
-
- @Override
- public void output(TupleTag<HopRow> tupleTag, HopRow output) {
- context.output(tupleTag, output);
- }
- }
- private class TransformFinishBundleContext implements TupleOutputContext<HopRow> {
-
- private DoFn.FinishBundleContext context;
- private BoundedWindow batchWindow;
-
- public TransformFinishBundleContext(
- DoFn.FinishBundleContext context, BoundedWindow batchWindow) {
- this.context = context;
- this.batchWindow = batchWindow;
- }
-
- @Override
- public void output(TupleTag<HopRow> tupleTag, HopRow output) {
- context.output(tupleTag, output, Instant.now(), batchWindow);
- }
- }
}
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java
index e808351977..55b5b63fdc 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java
@@ -18,6 +18,7 @@
package org.apache.hop.beam.core.transform;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
@@ -50,6 +51,7 @@ import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
import org.apache.hop.pipeline.transforms.injector.InjectorField;
import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
import org.joda.time.Instant;
+import org.json.simple.parser.ParseException;
import java.util.ArrayList;
import java.util.List;
@@ -139,8 +141,7 @@ public class TransformFn extends TransformBaseFn {
}
@Setup
- public void setup() {
- }
+ public void setup() {}
/**
* Reset the row buffer every time we start a new bundle to prevent the output of double rows
@@ -149,297 +150,327 @@ public class TransformFn extends TransformBaseFn {
*/
@StartBundle
public void startBundle(StartBundleContext startBundleContext) {
- if ("ScriptValueMod".equals(transformPluginId) && pipeline != null) {
- // Force re-initialization for this specific transform plugin
- initialize = true;
+ try {
+ // TODO: create a test to see if this is still needed
+ //
+ // if ("ScriptValueMod".equals(transformPluginId) && pipeline != null) {
+ // // Force re-initialization for this specific transform plugin
+ // initialize = true;
+ // }
+ //
+
+ // Call start of the bundle on the transform
+ //
+ if (executor != null) {
+ // Increment the bundle number before calling the next startBundle() method in the Hop transforms.
+ //
+ executor
+ .getPipeline()
+ .getTransforms()
+ .forEach(combi -> combi.data.setBeamBundleNr(combi.data.getBeamBundleNr() + 1));
+
+ executor.startBundle();
+ }
+ } catch (HopException e) {
+ throw new RuntimeException("Error at start of bundle!", e);
}
}
-
@ProcessElement
public void processElement(ProcessContext context, BoundedWindow window) {
-
try {
if (initialize) {
- initialize = false;
- // Initialize Hop and load extra plugins as well
- //
- BeamHop.init(transformPluginClasses, xpPluginClasses);
+ initializeTransformPipeline(context);
+ }
- // The content of the metadata is JSON serialized and inflated below.
- //
- IHopMetadataProvider metadataProvider = new SerializableMetadataProvider(metastoreJson);
- IVariables variables = new Variables();
- for (VariableValue variableValue : variableValues) {
- if (StringUtils.isNotEmpty(variableValue.getVariable())) {
- variables.setVariable(variableValue.getVariable(), variableValue.getValue());
- }
- }
+ // Get one row from the context main input and make a copy, so we can change it.
+ //
+ HopRow originalInputRow = context.element();
+ HopRow inputRow = HopBeamUtil.copyHopRow(originalInputRow, inputRowMeta);
+ readCounter.inc();
- // Create a very simple new transformation to run single threaded...
- // Single threaded...
- //
- pipelineMeta = new PipelineMeta();
- pipelineMeta.setName(transformName);
- pipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
- pipelineMeta.setMetadataProvider(metadataProvider);
+ emptyRowBuffer(new TransformProcessContext(context), inputRow);
+ } catch (Exception e) {
+ numErrors.inc();
+ LOG.error("Transform execution error :" + e.getMessage());
+ throw new RuntimeException("Error executing TransformFn", e);
+ }
+ }
- // Input row metadata...
- //
- inputRowMeta = JsonRowMeta.fromJson(inputRowMetaJson);
- List<IRowMeta> infoRowMetas = new ArrayList<>();
- for (String infoRowMetaJson : infoRowMetaJsons) {
- IRowMeta infoRowMeta = JsonRowMeta.fromJson(infoRowMetaJson);
- infoRowMetas.add(infoRowMeta);
- }
+ private void initializeTransformPipeline(DoFn<HopRow, HopRow>.ProcessContext context)
+ throws HopException, ParseException, JsonProcessingException {
+ initialize = false;
+ // Initialize Hop and load extra plugins as well
+ //
+ BeamHop.init(transformPluginClasses, xpPluginClasses);
- // Create an Injector transform with the right row layout...
- // This will help all transforms see the row layout statically...
- //
- TransformMeta mainInjectorTransformMeta = null;
- if (!inputTransform) {
- mainInjectorTransformMeta =
- createInjectorTransform(
- pipelineMeta, INJECTOR_TRANSFORM_NAME, inputRowMeta, 200, 200);
- }
+ // The content of the metadata is JSON serialized and inflated below.
+ //
+ IHopMetadataProvider metadataProvider = new SerializableMetadataProvider(metastoreJson);
+ IVariables variables = new Variables();
+ for (VariableValue variableValue : variableValues) {
+ if (StringUtils.isNotEmpty(variableValue.getVariable())) {
+ variables.setVariable(variableValue.getVariable(), variableValue.getValue());
+ }
+ }
- // Our main transform writes to a bunch of targets
- // Add a dummy transform for each one so the transform can target them
- //
- int targetLocationY = 200;
- List<TransformMeta> targetTransformMetas = new ArrayList<>();
- for (String targetTransform : targetTransforms) {
- DummyMeta dummyMeta = new DummyMeta();
- TransformMeta targetTransformMeta = new TransformMeta(targetTransform, dummyMeta);
- targetTransformMeta.setLocation(600, targetLocationY);
- targetLocationY += 150;
-
- targetTransformMetas.add(targetTransformMeta);
- pipelineMeta.addTransform(targetTransformMeta);
- }
+ // Create a very simple new transformation to run single threaded...
+ // Single threaded...
+ //
+ pipelineMeta = new PipelineMeta();
+ pipelineMeta.setName(transformName);
+ pipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
+ pipelineMeta.setMetadataProvider(metadataProvider);
- // The transform might read information from info transforms
- // like "Stream Lookup" or "Validator".
- // They read all the data on input from a side input.
- //
- List<List<HopRow>> infoDataSets = new ArrayList<>();
- List<TransformMeta> infoTransformMetas = new ArrayList<>();
- for (int i = 0; i < infoTransforms.size(); i++) {
- String infoTransform = infoTransforms.get(i);
- PCollectionView<List<HopRow>> cv = infoCollectionViews.get(i);
-
- // Get the data from the side input, from the info transform(s)
- //
- List<HopRow> infoDataSet = context.sideInput(cv);
- infoDataSets.add(infoDataSet);
-
- IRowMeta infoRowMeta = infoRowMetas.get(i);
-
- // Add an Injector transform for every info transform so the transform can read from it
- //
- TransformMeta infoTransformMeta =
- createInjectorTransform(pipelineMeta, infoTransform, infoRowMeta, 200, 350 + 150 * i);
- infoTransformMetas.add(infoTransformMeta);
- }
+ // Input row metadata...
+ //
+ inputRowMeta = JsonRowMeta.fromJson(inputRowMetaJson);
+ List<IRowMeta> infoRowMetas = new ArrayList<>();
+ for (String infoRowMetaJson : infoRowMetaJsons) {
+ IRowMeta infoRowMeta = JsonRowMeta.fromJson(infoRowMetaJson);
+ infoRowMetas.add(infoRowMeta);
+ }
- transformCombis = new ArrayList<>();
+ // Create an Injector transform with the right row layout...
+ // This will help all transforms see the row layout statically...
+ //
+ TransformMeta mainInjectorTransformMeta = null;
+ if (!inputTransform) {
+ mainInjectorTransformMeta =
+ createInjectorTransform(pipelineMeta, INJECTOR_TRANSFORM_NAME, inputRowMeta, 200, 200);
+ }
- // The main transform inflated from XML metadata...
- //
- PluginRegistry registry = PluginRegistry.getInstance();
- ITransformMeta iTransformMeta =
- registry.loadClass(TransformPluginType.class, transformPluginId, ITransformMeta.class);
- if (iTransformMeta == null) {
- throw new HopException(
- "Unable to load transform plugin with ID "
- + transformPluginId
- + ", this plugin isn't in the plugin registry or classpath");
- }
+ // Our main transform writes to a bunch of targets
+ // Add a dummy transform for each one so the transform can target them
+ //
+ int targetLocationY = 200;
+ List<TransformMeta> targetTransformMetas = new ArrayList<>();
+ for (String targetTransform : targetTransforms) {
+ DummyMeta dummyMeta = new DummyMeta();
+ TransformMeta targetTransformMeta = new TransformMeta(targetTransform, dummyMeta);
+ targetTransformMeta.setLocation(600, targetLocationY);
+ targetLocationY += 150;
+
+ targetTransformMetas.add(targetTransformMeta);
+ pipelineMeta.addTransform(targetTransformMeta);
+ }
- HopBeamUtil.loadTransformMetadataFromXml(
- transformName,
- iTransformMeta,
- transformMetaInterfaceXml,
- pipelineMeta.getMetadataProvider());
-
- transformMeta = new TransformMeta(transformName, iTransformMeta);
- transformMeta.setTransformPluginId(transformPluginId);
- transformMeta.setLocation(400, 200);
- pipelineMeta.addTransform(transformMeta);
- if (!inputTransform) {
- pipelineMeta.addPipelineHop(
- new PipelineHopMeta(mainInjectorTransformMeta, transformMeta));
- }
- // The target hops as well
- //
- for (TransformMeta targetTransformMeta : targetTransformMetas) {
- pipelineMeta.addPipelineHop(new PipelineHopMeta(transformMeta, targetTransformMeta));
- }
+ // The transform might read information from info transforms
+ // like "Stream Lookup" or "Validator".
+ // They read all the data on input from a side input.
+ //
+ List<List<HopRow>> infoDataSets = new ArrayList<>();
+ List<TransformMeta> infoTransformMetas = new ArrayList<>();
+ for (int i = 0; i < infoTransforms.size(); i++) {
+ String infoTransform = infoTransforms.get(i);
+ PCollectionView<List<HopRow>> cv = infoCollectionViews.get(i);
- // And the info hops...
- //
- for (TransformMeta infoTransformMeta : infoTransformMetas) {
- pipelineMeta.addPipelineHop(new PipelineHopMeta(infoTransformMeta, transformMeta));
- }
+ // Get the data from the side input, from the info transform(s)
+ //
+ List<HopRow> infoDataSet = context.sideInput(cv);
+ infoDataSets.add(infoDataSet);
- // If we are sending execution information to a location, see if we have any extra data
- // samplers
- // The data samplers list is composed of those in the data profile along with the set from
- // the extra ones in the parent pipeline.
- //
- lookupExecutionInformation(metadataProvider);
+ IRowMeta infoRowMeta = infoRowMetas.get(i);
+
+ // Add an Injector transform for every info transform so the transform can read from it
+ //
+ TransformMeta infoTransformMeta =
+ createInjectorTransform(pipelineMeta, infoTransform, infoRowMeta, 200, 350 + 150 * i);
+ infoTransformMetas.add(infoTransformMeta);
+ }
- iTransformMeta.searchInfoAndTargetTransforms(pipelineMeta.getTransforms());
+ transformCombis = new ArrayList<>();
- // Create the transformation...
- //
- pipeline =
- new LocalPipelineEngine(
- pipelineMeta, variables, new LoggingObject("apache-beam-transform"));
- pipeline.setLogLevel(
- context.getPipelineOptions().as(HopPipelineExecutionOptions.class).getLogLevel());
- pipeline.setMetadataProvider(pipelineMeta.getMetadataProvider());
-
- // Change the name to make the logging less confusing.
- //
- pipeline
- .getPipelineRunConfiguration()
- .setName("beam-transform-local (" + transformName + ")");
+ // The main transform inflated from XML metadata...
+ //
+ PluginRegistry registry = PluginRegistry.getInstance();
+ ITransformMeta iTransformMeta =
+ registry.loadClass(TransformPluginType.class, transformPluginId, ITransformMeta.class);
+ if (iTransformMeta == null) {
+ throw new HopException(
+ "Unable to load transform plugin with ID "
+ + transformPluginId
+ + ", this plugin isn't in the plugin registry or classpath");
+ }
- pipeline.prepareExecution();
+ HopBeamUtil.loadTransformMetadataFromXml(
+ transformName,
+ iTransformMeta,
+ transformMetaInterfaceXml,
+ pipelineMeta.getMetadataProvider());
- // Create producers so we can efficiently pass data
- //
- rowProducer = null;
- if (!inputTransform) {
- rowProducer = pipeline.addRowProducer(INJECTOR_TRANSFORM_NAME, 0);
- }
- List<RowProducer> infoRowProducers = new ArrayList<>();
- for (String infoTransform : infoTransforms) {
- RowProducer infoRowProducer = pipeline.addRowProducer(infoTransform, 0);
- infoRowProducers.add(infoRowProducer);
- }
+ transformMeta = new TransformMeta(transformName, iTransformMeta);
+ transformMeta.setTransformPluginId(transformPluginId);
+ transformMeta.setLocation(400, 200);
+ pipelineMeta.addTransform(transformMeta);
+ if (!inputTransform) {
+ pipelineMeta.addPipelineHop(new PipelineHopMeta(mainInjectorTransformMeta, transformMeta));
+ }
+ // The target hops as well
+ //
+ for (TransformMeta targetTransformMeta : targetTransformMetas) {
+ pipelineMeta.addPipelineHop(new PipelineHopMeta(transformMeta, targetTransformMeta));
+ }
- // Find the right interfaces for execution later...
- //
- if (!inputTransform) {
- TransformMetaDataCombi injectorCombi = findCombi(pipeline, INJECTOR_TRANSFORM_NAME);
- transformCombis.add(injectorCombi);
- }
+ // And the info hops...
+ //
+ for (TransformMeta infoTransformMeta : infoTransformMetas) {
+ pipelineMeta.addPipelineHop(new PipelineHopMeta(infoTransformMeta, transformMeta));
+ }
- TransformMetaDataCombi transformCombi = findCombi(pipeline, transformName);
- transformCombis.add(transformCombi);
-
- if (targetTransforms.isEmpty()) {
- IRowListener rowListener =
- new RowAdapter() {
- @Override
- public void rowWrittenEvent(IRowMeta rowMeta, Object[] row)
- throws HopTransformException {
- resultRows.add(new HopRow(row, rowMeta.size()));
- }
- };
- transformCombi.transform.addRowListener(rowListener);
- }
+ // If we are sending execution information to a location, see if we have any extra data
+ // samplers
+ // The data samplers list is composed of those in the data profile along with the set from
+ // the extra ones in the parent pipeline.
+ //
+ lookupExecutionInformation(metadataProvider);
- // Create a list of TupleTag to direct the target rows
- //
- mainTupleTag = new TupleTag<>(HopBeamUtil.createMainOutputTupleId(transformName)) {};
- tupleTagList = new ArrayList<>();
+ iTransformMeta.searchInfoAndTargetTransforms(pipelineMeta.getTransforms());
- // The lists in here will contain all the rows that ended up in the various target
- // transforms (if any)
- //
- targetResultRowsList = new ArrayList<>();
-
- for (String targetTransform : targetTransforms) {
- TransformMetaDataCombi targetCombi = findCombi(pipeline, targetTransform);
- transformCombis.add(targetCombi);
-
- String tupleId = HopBeamUtil.createTargetTupleId(transformName, targetTransform);
- TupleTag<HopRow> tupleTag = new TupleTag<>(tupleId) {};
- tupleTagList.add(tupleTag);
- final List<Object[]> targetResultRows = new ArrayList<>();
- targetResultRowsList.add(targetResultRows);
-
- targetCombi.transform.addRowListener(
- new RowAdapter() {
- @Override
- public void rowReadEvent(IRowMeta rowMeta, Object[] row)
- throws HopTransformException {
- // We send the target row to a specific list...
- //
- targetResultRows.add(row);
- }
- });
- }
+ // Create the transformation...
+ //
+ pipeline =
+ new LocalPipelineEngine(
+ pipelineMeta, variables, new LoggingObject("apache-beam-transform"));
+ pipeline.setLogLevel(
+ context.getPipelineOptions().as(HopPipelineExecutionOptions.class).getLogLevel());
+ pipeline.setMetadataProvider(pipelineMeta.getMetadataProvider());
+
+ // Change the name to make the logging less confusing.
+ //
+ pipeline.getPipelineRunConfiguration().setName("beam-transform-local (" + transformName + ")");
- attachExecutionSamplersToOutput(
- variables,
- transformName,
- pipeline.getLogChannelId(),
- inputRowMeta,
- pipelineMeta.getTransformFields(variables, transformName),
- pipeline.getTransform(transformName, 0));
+ pipeline.prepareExecution();
- executor = new SingleThreadedPipelineExecutor(pipeline);
+ // Indicate that we're dealing with a Beam pipeline during execution
+ // Start counting bundle numbers from 1
+ //
+ pipeline.getTransforms().forEach(c -> {
+ c.data.setBeamContext(true);
+ c.data.setBeamBundleNr(1);
+ });
- // Initialize the transforms...
- //
- executor.init();
+ // Create producers so we can efficiently pass data
+ //
+ rowProducer = null;
+ if (!inputTransform) {
+ rowProducer = pipeline.addRowProducer(INJECTOR_TRANSFORM_NAME, 0);
+ }
+ List<RowProducer> infoRowProducers = new ArrayList<>();
+ for (String infoTransform : infoTransforms) {
+ RowProducer infoRowProducer = pipeline.addRowProducer(infoTransform, 0);
+ infoRowProducers.add(infoRowProducer);
+ }
- Counter initCounter = Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName);
- readCounter = Metrics.counter(Pipeline.METRIC_NAME_READ, transformName);
- writtenCounter = Metrics.counter(Pipeline.METRIC_NAME_WRITTEN, transformName);
+ // Find the right interfaces for execution later...
+ //
+ if (!inputTransform) {
+ TransformMetaDataCombi injectorCombi = findCombi(pipeline, INJECTOR_TRANSFORM_NAME);
+ transformCombis.add(injectorCombi);
+ }
- initCounter.inc();
+ TransformMetaDataCombi transformCombi = findCombi(pipeline, transformName);
+ transformCombis.add(transformCombi);
+
+ if (targetTransforms.isEmpty()) {
+ IRowListener rowListener =
+ new RowAdapter() {
+ @Override
+ public void rowWrittenEvent(IRowMeta rowMeta, Object[] row)
+ throws HopTransformException {
+ resultRows.add(new HopRow(row, rowMeta.size()));
+ }
+ };
+ transformCombi.transform.addRowListener(rowListener);
+ }
- // Doesn't really start the threads in single threaded mode
- // Just sets some flags all over the place
- //
- pipeline.startThreads();
+ // Create a list of TupleTag to direct the target rows
+ //
+ mainTupleTag = new TupleTag<>(HopBeamUtil.createMainOutputTupleId(transformName)) {};
+ tupleTagList = new ArrayList<>();
- resultRows = new ArrayList<>();
+ // The lists in here will contain all the rows that ended up in the various target
+ // transforms (if any)
+ //
+ targetResultRowsList = new ArrayList<>();
+
+ for (String targetTransform : targetTransforms) {
+ TransformMetaDataCombi targetCombi = findCombi(pipeline, targetTransform);
+ transformCombis.add(targetCombi);
+
+ String tupleId = HopBeamUtil.createTargetTupleId(transformName, targetTransform);
+ TupleTag<HopRow> tupleTag = new TupleTag<>(tupleId) {};
+ tupleTagList.add(tupleTag);
+ final List<Object[]> targetResultRows = new ArrayList<>();
+ targetResultRowsList.add(targetResultRows);
+
+ targetCombi.transform.addRowListener(
+ new RowAdapter() {
+ @Override
+ public void rowReadEvent(IRowMeta rowMeta, Object[] row) throws HopTransformException {
+ // We send the target row to a specific list...
+ //
+ targetResultRows.add(row);
+ }
+ });
+ }
- // Copy the info data sets to the info transforms...
- // We do this only once so all subsequent rows can use this.
- //
- for (int i = 0; i < infoTransforms.size(); i++) {
- RowProducer infoRowProducer = infoRowProducers.get(i);
- List<HopRow> infoDataSet = infoDataSets.get(i);
- TransformMetaDataCombi combi = findCombi(pipeline, infoTransforms.get(i));
- IRowMeta infoRowMeta = infoRowMetas.get(i);
-
- // Pass and process the rows in the info transforms
- //
- for (HopRow infoRowData : infoDataSet) {
- infoRowProducer.putRow(infoRowMeta, infoRowData.getRow());
- combi.transform.processRow();
- }
-
- // By calling finished() transforms like Stream Lookup know no more rows are going to
- // come, and they can start to work with the info data set
- //
- infoRowProducer.finished();
-
- // Call once more to flag input as done, transform as finished.
- //
- combi.transform.processRow();
- }
+ attachExecutionSamplersToOutput(
+ variables,
+ transformName,
+ pipeline.getLogChannelId(),
+ inputRowMeta,
+ pipelineMeta.getTransformFields(variables, transformName),
+ pipeline.getTransform(transformName, 0));
+
+ executor = new SingleThreadedPipelineExecutor(pipeline);
+
+ // Initialize the transforms...
+ //
+ executor.init();
+
+ Counter initCounter = Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName);
+ readCounter = Metrics.counter(Pipeline.METRIC_NAME_READ, transformName);
+ writtenCounter = Metrics.counter(Pipeline.METRIC_NAME_WRITTEN, transformName);
+
+ initCounter.inc();
+
+ // Doesn't really start the threads in single threaded mode
+ // Just sets some flags all over the place
+ //
+ pipeline.startThreads();
+
+ resultRows = new ArrayList<>();
+
+ // Copy the info data sets to the info transforms...
+ // We do this only once so all subsequent rows can use this.
+ //
+ for (int i = 0; i < infoTransforms.size(); i++) {
+ RowProducer infoRowProducer = infoRowProducers.get(i);
+ List<HopRow> infoDataSet = infoDataSets.get(i);
+ TransformMetaDataCombi combi = findCombi(pipeline, infoTransforms.get(i));
+ IRowMeta infoRowMeta = infoRowMetas.get(i);
+
+ // Pass and process the rows in the info transforms
+ //
+ for (HopRow infoRowData : infoDataSet) {
+ infoRowProducer.putRow(infoRowMeta, infoRowData.getRow());
+ combi.transform.processRow();
}
- // Get one row from the context main input and make a copy, so we can change it.
+ // By calling finished() transforms like Stream Lookup know no more rows are going to
+ // come, and they can start to work with the info data set
//
- HopRow originalInputRow = context.element();
- HopRow inputRow = HopBeamUtil.copyHopRow(originalInputRow, inputRowMeta);
- readCounter.inc();
+ infoRowProducer.finished();
- emptyRowBuffer(new TransformProcessContext(context), inputRow);
- } catch (Exception e) {
- numErrors.inc();
- LOG.error("Transform execution error :" + e.getMessage());
- throw new RuntimeException("Error executing TransformFn", e);
+ // Call once more to flag input as done, transform as finished.
+ //
+ combi.transform.processRow();
}
+
+ // Flag the start of a bundle for the first time.
+ //
+ executor.startBundle();
}
/**
@@ -568,7 +599,11 @@ public class TransformFn extends TransformBaseFn {
@FinishBundle
public void finishBundle(FinishBundleContext context) {
try {
-
+ // Signal the end of the bundle on the transform
+ //
+ if (executor != null) {
+ executor.finishBundle();
+ }
} catch (Exception e) {
numErrors.inc();
LOG.error("Transform finishing bundle error :" + e.getMessage());
@@ -593,11 +628,9 @@ public class TransformFn extends TransformBaseFn {
}
} catch (Exception e) {
LOG.error(
- "Error sending row samples to execution info location for transform " + transformName,
- e);
+ "Error sending row samples to execution info location for transform " + transformName, e);
throw new RuntimeException(
- "Error sending row samples to execution info location for transform " + transformName,
- e);
+ "Error sending row samples to execution info location for transform " + transformName, e);
}
}
}
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java
index 105868a3aa..cb482aa21f 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java
@@ -19,37 +19,13 @@ package org.apache.hop.beam.core.transform;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.*;
-import org.apache.commons.lang.StringUtils;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.beam.core.shared.VariableValue;
import org.apache.hop.beam.core.util.HopBeamUtil;
-import org.apache.hop.beam.core.util.JsonRowMeta;
-import org.apache.hop.beam.engines.HopPipelineExecutionOptions;
-import org.apache.hop.core.exception.HopException;
-import org.apache.hop.core.exception.HopTransformException;
-import org.apache.hop.core.logging.LogLevel;
-import org.apache.hop.core.logging.LoggingObject;
-import org.apache.hop.core.metadata.SerializableMetadataProvider;
-import org.apache.hop.core.plugins.PluginRegistry;
-import org.apache.hop.core.plugins.TransformPluginType;
-import org.apache.hop.core.row.IRowMeta;
-import org.apache.hop.core.row.IValueMeta;
-import org.apache.hop.core.variables.IVariables;
-import org.apache.hop.core.variables.Variables;
-import org.apache.hop.metadata.api.IHopMetadataProvider;
-import org.apache.hop.pipeline.*;
-import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
-import org.apache.hop.pipeline.transform.*;
-import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
-import org.apache.hop.pipeline.transforms.injector.InjectorField;
-import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
-import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
index a97b42fb44..9b0dce24cb 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
@@ -557,9 +557,9 @@ public class HopPipelineMetaToBeamPipelineConverter {
throw new HopException(
"Group By is not supported. Use the Memory Group By transform instead. It comes closest to Beam functionality.");
}
- if (meta instanceof SortRowsMeta) {
- throw new HopException("Sort rows is not yet supported on Beam.");
- }
+// if (meta instanceof SortRowsMeta) {
+// throw new HopException("Sort rows is not yet supported on Beam.");
+// }
if (meta instanceof UniqueRowsMeta) {
throw new HopException(
"The unique rows transform is not yet supported on Beam, for now use a Memory Group By to get distrinct rows");
diff --git a/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java b/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java
index 613c966fe4..71216b6b1c 100644
--- a/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java
+++ b/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java
@@ -690,13 +690,13 @@ public class Cypher extends BaseTransform<CypherMeta, CypherData> {
}
@Override
- public void batchComplete() {
+ public void batchComplete() throws HopException {
try {
wrapUpTransaction();
} catch (Exception e) {
setErrors(getErrors() + 1);
stopAll();
- throw new RuntimeException("Unable to complete batch of records", e);
+ throw new HopException("Unable to complete batch of records", e);
}
}
diff --git a/plugins/tech/parquet/pom.xml b/plugins/tech/parquet/pom.xml
index 5c4bc8632e..4bce93a3a3 100755
--- a/plugins/tech/parquet/pom.xml
+++ b/plugins/tech/parquet/pom.xml
@@ -43,7 +43,7 @@
</licenses>
<properties>
- <parquet.version>1.12.0</parquet.version>
+ <parquet.version>1.12.3</parquet.version>
<hadoop.version>2.6.5</hadoop.version>
</properties>
diff --git a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
index 7ade914c59..e681ded16b 100644
--- a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
+++ b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
@@ -269,6 +269,9 @@ public class ParquetOutput extends BaseTransform<ParquetOutputMeta, ParquetOutpu
if (meta.isFilenameIncludingSplitNr()) {
filename += "-" + new DecimalFormat("0000").format(data.split);
}
+ if (data.isBeamContext()) {
+ filename+= "_"+log.getLogChannelId()+"_"+data.getBeamBundleNr();
+ }
filename += "." + Const.NVL(resolve(meta.getFilenameExtension()), "parquet");
filename += meta.getCompressionCodec().getExtension();
return filename;
@@ -281,4 +284,23 @@ public class ParquetOutput extends BaseTransform<ParquetOutputMeta, ParquetOutpu
throw new HopException("Error closing file " + data.filename, e);
}
}
+
+ @Override
+ public void batchComplete() throws HopException {
+ if (!data.isBeamContext()) {
+ closeFile();
+ }
+ }
+
+ @Override
+ public void startBundle() throws HopException {
+ if (!first) {
+ openNewFile();
+ }
+ }
+
+ @Override
+ public void finishBundle() throws HopException {
+ closeFile();
+ }
}
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java
index d9ff813348..591ce41b63 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java
@@ -338,7 +338,7 @@ public class ExcelWriterFileField {
autosizecolums = false;
streamingData = false;
extension = "xls";
- doNotOpenNewFileInit = false;
+ doNotOpenNewFileInit = true;
transformNrInFilename = false;
dateInFilename = false;
timeInFilename = false;
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java
index a0e260a229..e0c80964f6 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java
@@ -69,9 +69,9 @@ public class ExcelWriterTransform
// get next row
Object[] r = getRow();
- // first row initialization
+ // We might have a row here, or we might get a null row if there was no input.
+ //
if (first) {
-
first = false;
if (r == null) {
data.outputRowMeta = new RowMeta();
@@ -81,10 +81,10 @@ public class ExcelWriterTransform
data.inputRowMeta = getInputRowMeta().clone();
}
- // if we are supposed to init the file up front, here we go
+ // If we are supposed to create the file up front regardless of whether we receive input rows
+ // then this is the place to do it.
+ //
if (!meta.getFile().isDoNotOpenNewFileInit()) {
- data.firstFileOpened = true;
-
try {
prepareNextOutputFile(r);
} catch (HopException e) {
@@ -100,9 +100,9 @@ public class ExcelWriterTransform
}
if (r != null) {
- // if we are supposed to init the file delayed, here we go
+ // If we are supposed to create a file after receiving the first row, we do this here.
+ //
if (meta.getFile().isDoNotOpenNewFileInit()) {
- data.firstFileOpened = true;
prepareNextOutputFile(r);
}
@@ -166,19 +166,25 @@ public class ExcelWriterTransform
if (r != null) {
// check if the filename has changed between rows
- if (meta.getFile().isFileNameInField()
- && !data.currentWorkbookDefinition.getFile().equals(getFileLocation(r))) {
- // check if the file is already used and switch or create new file
- boolean fileFound = false;
- for (int i = 0; i < data.usedFiles.size(); i++) {
- if (data.usedFiles.get(i).getFile().equals(getFileLocation(r))) {
- fileFound = true;
- data.currentWorkbookDefinition = data.usedFiles.get(i);
- break;
- }
+ if (meta.getFile().isFileNameInField()) {
+ if (data.isBeamContext()) {
+ throw new HopException(
+ "Storing filenames in an input field is not supported in Beam pipelines");
}
- if (!fileFound) {
- prepareNextOutputFile(r);
+
+ if (!data.currentWorkbookDefinition.getFile().equals(getFileLocation(r))) {
+ // check if the file is already used and switch or create new file
+ boolean fileFound = false;
+ for (int i = 0; i < data.usedFiles.size(); i++) {
+ if (data.usedFiles.get(i).getFile().equals(getFileLocation(r))) {
+ fileFound = true;
+ data.currentWorkbookDefinition = data.usedFiles.get(i);
+ break;
+ }
+ }
+ if (!fileFound) {
+ prepareNextOutputFile(r);
+ }
}
}
@@ -206,19 +212,19 @@ public class ExcelWriterTransform
}
return true;
} else {
- closeFilesAndDispose();
+ closeFiles();
+ setOutputDone();
return false;
}
}
- public void closeFilesAndDispose() throws HopException {
+ public void closeFiles() throws HopException {
// Close all files and dispose objects
for (ExcelWriterWorkbookDefinition workbookDefinition : data.usedFiles) {
closeOutputFile(workbookDefinition);
}
data.usedFiles.clear();
- setOutputDone();
}
private void createParentFolder(FileObject filename) throws Exception {
@@ -361,6 +367,7 @@ public class ExcelWriterTransform
if (xlsRow == null) {
xlsRow = workbookDefinition.getSheet().createRow(workbookDefinition.getPosY());
}
+
Object v = null;
if (meta.getOutputFields() == null || meta.getOutputFields().isEmpty()) {
// Write all values in stream to text file.
@@ -679,7 +686,13 @@ public class ExcelWriterTransform
* @return current output filename to write to
*/
public String buildFilename(int splitNr) {
- return meta.buildFilename(this, getCopy(), splitNr);
+ return meta.buildFilename(
+ this,
+ getCopy(),
+ splitNr,
+ data.isBeamContext(),
+ log.getLogChannelId(),
+ data.getBeamBundleNr());
}
/**
@@ -704,12 +717,23 @@ public class ExcelWriterTransform
public void prepareNextOutputFile(Object[] row) throws HopException {
try {
+ // Validation
+ //
// sheet name shouldn't exceed 31 character
if (data.realSheetname != null && data.realSheetname.length() > 31) {
throw new HopException(
BaseMessages.getString(
PKG, "ExcelWriterTransform.Exception.MaxSheetName", data.realSheetname));
}
+
+ // Getting field names from input is not supported in a Beam context
+ //
+ if (data.isBeamContext() && meta.getFile().isFileNameInField()) {
+ throw new HopException(
+ BaseMessages.getString(
+ PKG, "ExcelWriterTransform.Exception.FilenameFromFieldNotSupportedInBeam"));
+ }
+
// clear style cache
int numOfFields =
meta.getOutputFields() != null && meta.getOutputFields().size() > 0
@@ -904,7 +928,8 @@ public class ExcelWriterTransform
data.startingCol = 0;
}
- // calculate starting positions
+ // Calculate the starting positions in the sheet.
+ //
int posX;
int posY;
posX = data.startingCol;
@@ -964,7 +989,7 @@ public class ExcelWriterTransform
} catch (Exception e) {
logError("Error opening new file", e);
setErrors(1);
- throw new HopException(e);
+ throw new HopException("Error opening new file", e);
}
}
@@ -1031,18 +1056,29 @@ public class ExcelWriterTransform
return false;
}
- /** pipeline run end */
@Override
- public void dispose() {
- // Call one more time to make sure the files are closed in a Beam context
+ public void startBundle() throws HopException {
+ // Generate a new file for the next bundle
//
- try {
- closeFilesAndDispose();
- } catch (Exception e) {
- throw new RuntimeException("Error closing output files in Excel writer", e);
+ if (!first) {
+ prepareNextOutputFile(null);
}
+ }
- super.dispose();
+ @Override
+ public void finishBundle() throws HopException {
+ closeFiles();
+ }
+
+ @Override
+ public void batchComplete() throws HopException {
+ // Call to make sure the files are closed in a Beam context
+ // On Beam the single threader engine works with one row at a time.
+ // We need to keep the file(s) open until the end of the bundle.
+ //
+ if (!data.isBeamContext()) {
+ closeFiles();
+ }
}
/** Write protect Sheet by setting password works only for xls output at the moment */
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java
index 85be37b479..45aa6a932e 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java
@@ -28,7 +28,6 @@ public class ExcelWriterTransformData extends BaseTransformData implements ITran
public IRowMeta outputRowMeta;
public String realSheetname;
public String realTemplateSheetName;
- public boolean firstFileOpened;
public ArrayList<ExcelWriterWorkbookDefinition> usedFiles = new ArrayList<>();
public ExcelWriterWorkbookDefinition currentWorkbookDefinition;
public int[] fieldnrs;
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java
index 41d7d6ff80..c4aa5f8f3d 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java
@@ -333,7 +333,7 @@ public class ExcelWriterTransformMeta
int i = 0;
for (int copy = 0; copy < copies; copy++) {
for (int split = 0; split < splits; split++) {
- retval[i] = buildFilename(variables, copy, split);
+ retval[i] = buildFilename(variables, copy, split, false, "", 1);
i++;
}
}
@@ -344,7 +344,7 @@ public class ExcelWriterTransformMeta
return retval;
}
- public String buildFilename(IVariables variables, int transformnr, int splitnr) {
+ public String buildFilename(IVariables variables, int transformNr, int splitNr, boolean beamContext, String transformId, int bundleNr) {
SimpleDateFormat daf = new SimpleDateFormat();
// Replace possible environment variables...
@@ -370,10 +370,13 @@ public class ExcelWriterTransformMeta
}
}
if (file.isTransformNrInFilename()) {
- retval += "_" + transformnr;
+ retval += "_" + transformNr;
}
if (file.getSplitEvery() > 0) {
- retval += "_" + splitnr;
+ retval += "_" + splitNr;
+ }
+ if (beamContext) {
+ retval+= "_"+transformId+"_"+bundleNr;
}
if (realextension != null && realextension.length() != 0) {
diff --git a/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties b/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties
index cf914d1100..80018b464f 100644
--- a/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties
+++ b/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties
@@ -99,7 +99,7 @@ ExcelWriterDialog.StartingCell.Tooltip=Enter cell reference to start writing at,
ExcelWriterDialog.ContentTab.TabTitle=Content
ExcelWriterDialog.IfSheetExists.Tooltip=If the sheet exists in the file you may choose to delete it and replace it with a fresh sheet or reuse the existing sheet for writing.
ExcelWriterDialog.CreateParentFolder.Label=Create parent folder
-ExcelWriterMeta.Injection.CreateParentFolder.Field=Create parent folder (Y/N)
+ExcelWriterMeta.Injection.CreateParentFolder.Field=Create parent folder
ExcelWriterDialog.CreateParentFolder.Tooltip=Check this if you want to create parent folder\n when necessary. Otherwise, Apache Hop will throw an exception when parent folder doesn''t exist.
ExcelWriterDialog.Extension.Tooltip=This is the file extension of the Excel file. It also implies the file format.
ExcelWriterDialog.RowWritingMethod.PushDown.Label=shift existing cells down
@@ -130,6 +130,7 @@ ExcelWriterDialog.StreamData.Label=Stream XSLX data
ExcelWriterTransform.Exception.MaxSheetName=Sheet name exceeds 31 character: {0}
ExcelWriterTransform.Exception.TemplateNotFound=Template Sheet: {0} not found, aborting
ExcelWriterTransform.Exception.CouldNotPrepareFile=Could not prepare output file {0}
+ExcelWriterTransform.Exception.FilenameFromFieldNotSupportedInBeam=Getting the filename from a field is not support when running in Beam.
ExcelWriterDialog.TemplateSheetHide.Label=Hide Template Sheet
ExcelWriterDialog.TemplateSheetHide.Tooltip=Ensure that the template sheet will be hidden
ExcelWriterMeta.Tab.Sheetname.Text=Sheet1
@@ -140,50 +141,50 @@ ExcelWriterDialog.FailedToGetFields.DialogMessage=We can not get fields name fro
ExcelWriterMeta.Injection.FileName.Field=Filename
ExcelWriterMeta.Injection.IfFileExists.Field=If output file exists (reuse/new)?
ExcelWriterMeta.Injection.IfSheetExists.Field=If sheet exists in output file (reuse/new)?
-ExcelWriterMeta.Injection.MakeSheetActive.Field=Make this the active sheet (Y/N)?
-ExcelWriterMeta.Injection.ForceFormulaRecalculation.Field=Force formula recalculation (Y/N)?
-ExcelWriterMeta.Injection.LeaveExistingStylesUnchanged.Field=Leave styles of existing cells unchanged (Y/N)?
+ExcelWriterMeta.Injection.MakeSheetActive.Field=Make this the active sheet?
+ExcelWriterMeta.Injection.ForceFormulaRecalculation.Field=Force formula recalculation?
+ExcelWriterMeta.Injection.LeaveExistingStylesUnchanged.Field=Leave styles of existing cells unchanged?
ExcelWriterMeta.Injection.AppendOffset.Field=Offset by ... rows
ExcelWriterMeta.Injection.AppendEmpty.Field=Begin by writing ... empty lines
-ExcelWriterMeta.Injection.AppendOmitHeader.Field=Omit header (Y/N)?
+ExcelWriterMeta.Injection.AppendOmitHeader.Field=Omit header?
ExcelWriterMeta.Injection.RowWritingMethod.Field=Row writing method (overwrite/pus)
ExcelWriterMeta.Injection.StartingCell.Field=Start writing at cell
ExcelWriterMeta.Injection.Extension.Field=Extension (xls/xlsx)
ExcelWriterMeta.Injection.Password.Field=Password
ExcelWriterMeta.Injection.ProtectedBy.Field=Protected by user
-ExcelWriterMeta.Injection.HeaderEnabled.Field=Write header (Y/N)?
-ExcelWriterMeta.Injection.FooterEnabled.Field=Write footer (Y/N)?
+ExcelWriterMeta.Injection.HeaderEnabled.Field=Write header?
+ExcelWriterMeta.Injection.FooterEnabled.Field=Write footer?
ExcelWriterMeta.Injection.SplitEvery.Field=Split every ... data rows
-ExcelWriterMeta.Injection.TransformNrInFilename.Field=Include transform nr in filename (Y/N)?
-ExcelWriterMeta.Injection.DateInFilename.Field=Include date in filename (Y/N)?
-ExcelWriterMeta.Injection.AddToResultFilenames.Field=Add filenames to result (Y/N)?
-ExcelWriterMeta.Injection.ProtectSheet.Field=Protect sheet (XLS format only) (Y/N)?
-ExcelWriterMeta.Injection.TimeInFilename.Field=Include time in filename (Y/N)?
-ExcelWriterMeta.Injection.TemplateEnabled.Field=Use template when creating new files (Y/N)?
-ExcelWriterMeta.Injection.TemplateSheetEnabled.Field=Use template when creating new sheets (Y/N)?
-ExcelWriterMeta.Injection.TemplateSheetHidden.Field=Hide template sheet (Y/N)?
+ExcelWriterMeta.Injection.TransformNrInFilename.Field=Include transform split nr in filename?
+ExcelWriterMeta.Injection.DateInFilename.Field=Include date in filename?
+ExcelWriterMeta.Injection.AddToResultFilenames.Field=Add filenames to result?
+ExcelWriterMeta.Injection.ProtectSheet.Field=Protect sheet (XLS format only)?
+ExcelWriterMeta.Injection.TimeInFilename.Field=Include time in filename?
+ExcelWriterMeta.Injection.TemplateEnabled.Field=Use template when creating new files?
+ExcelWriterMeta.Injection.TemplateSheetEnabled.Field=Use template when creating new sheets?
+ExcelWriterMeta.Injection.TemplateSheetHidden.Field=Hide template sheet?
ExcelWriterMeta.Injection.TemplateFileName.Field=Template file
ExcelWriterMeta.Injection.TemplateSheetName.Field=Template sheet name
ExcelWriterMeta.Injection.SheetName.Field=Sheet name
-ExcelWriterMeta.Injection.AppendLines.Field=Append lines at the end of sheet (Y/N)?
-ExcelWriterMeta.Injection.DoNotOpenNewFileInit.Field=Wait for first row before creating new file (Y/N)?
-ExcelWriterMeta.Injection.SpecifyFormat.Field=Specify date/time format (Y/N)?
+ExcelWriterMeta.Injection.AppendLines.Field=Append lines at the end of sheet?
+ExcelWriterMeta.Injection.DoNotOpenNewFileInit.Field=Wait for first row before creating new file?
+ExcelWriterMeta.Injection.SpecifyFormat.Field=Specify date/time format?
ExcelWriterMeta.Injection.DateTimeFormat.Field=Date time format field
-ExcelWriterMeta.Injection.AutoSizeColums.Field=Auto size columns (Y/N)
-ExcelWriterMeta.Injection.StreamingData.Field=Stream XLSX data (Y/N)?
+ExcelWriterMeta.Injection.AutoSizeColums.Field=Auto size columns
+ExcelWriterMeta.Injection.StreamingData.Field=Stream XLSX data?
ExcelWriterMeta.Injection.Fields=Fields
ExcelWriterMeta.Injection.Field=Field
ExcelWriterMeta.Injection.Output.FieldName.Field=Field name
ExcelWriterMeta.Injection.Output.Type.Field=Type
ExcelWriterMeta.Injection.Output.Format.Field=Format
ExcelWriterMeta.Injection.Output.Title.Field=Field title
-ExcelWriterMeta.Injection.Output.FieldContainFormula.Field=Field contains formula (Y/N)?
+ExcelWriterMeta.Injection.Output.FieldContainFormula.Field=Field contains formula?
ExcelWriterMeta.Injection.Output.Hyperlink.Field=Hyperlink
ExcelWriterMeta.Injection.Output.Comment.Field=Cell comment (XLSX)
ExcelWriterMeta.Injection.Output.CommentAuthor.Field=Cell comment author (XLSX)
ExcelWriterMeta.Injection.Output.TitleStyleCell.Field=Header/footer style from cell
ExcelWriterMeta.Injection.Output.StyleCell.Field=Style from cell
-ExcelWriterMeta.Injection.FilenameInField.Field=Filename in field (Y/N)?
+ExcelWriterMeta.Injection.FilenameInField.Field=Filename in field?
ExcelWriterMeta.Injection.FilenameField.Field=Filename field
ExcelWriterTransformMeta.keyword=excel,writer,transform
ExcelWriter.Log.ParentFolderNotExistCreateIt=We can not find folder [{0}]! You need to create it before generating file [{1}].
diff --git a/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java b/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java
index 0897d09f7d..706429c699 100644
--- a/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java
+++ b/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java
@@ -346,69 +346,54 @@ public class ScriptValuesDummy implements ITransform {
@Override
public boolean isRunning() {
- // TODO Auto-generated method stub
return false;
}
public boolean isUsingThreadPriorityManagment() {
- // TODO Auto-generated method stub
return false;
}
public void setUsingThreadPriorityManagment(boolean usingThreadPriorityManagment) {
- // TODO Auto-generated method stub
-
}
@Override
public void setRunning(boolean running) {
- // TODO Auto-generated method stub
-
}
@Override
public void setStopped(boolean stopped) {
- // TODO Auto-generated method stub
-
}
@Override
public void setSafeStopped(boolean stopped) {
- // TODO Auto-generated method stub
}
@Override
public int rowsetInputSize() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public int rowsetOutputSize() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public long getProcessed() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public Map<String, ResultFile> getResultFiles() {
- // TODO Auto-generated method stub
return null;
}
public long getRuntime() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public EngineComponent.ComponentExecutionStatus getStatus() {
- // TODO Auto-generated method stub
return null;
}
@@ -429,31 +414,23 @@ public class ScriptValuesDummy implements ITransform {
@Override
public boolean isPaused() {
- // TODO Auto-generated method stub
return false;
}
@Override
public void identifyErrorOutput() {
- // TODO Auto-generated method stub
-
}
@Override
public void setPartitioned(boolean partitioned) {
- // TODO Auto-generated method stub
-
}
@Override
public void setRepartitioning(int partitioningMethod) {
- // TODO Auto-generated method stub
-
}
@Override
public boolean canProcessOneRow() {
- // TODO Auto-generated method stub
return false;
}
@@ -463,68 +440,60 @@ public class ScriptValuesDummy implements ITransform {
}
public boolean isWaitingForData() {
- // TODO Auto-generated method stub
return false;
}
public void setWaitingForData(boolean waitingForData) {
- // TODO Auto-generated method stub
}
public boolean isIdle() {
- // TODO Auto-generated method stub
return false;
}
public boolean isPassingData() {
- // TODO Auto-generated method stub
return false;
}
public void setPassingData(boolean passingData) {
- // TODO Auto-generated method stub
-
}
@Override
public void batchComplete() throws HopException {
- // TODO Auto-generated method stub
}
@Override
- public void setMetadataProvider(IHopMetadataProvider metadataProvider) {
- // TODO Auto-generated method stub
+ public void startBundle() throws HopException {
+ }
+
+ @Override
+ public void finishBundle() throws HopException {
+ }
+ @Override
+ public void setMetadataProvider(IHopMetadataProvider metadataProvider) {
}
@Override
public IHopMetadataProvider getMetadataProvider() {
- // TODO Auto-generated method stub
return null;
}
@Override
public int getCurrentInputRowSetNr() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public void setCurrentOutputRowSetNr(int index) {
- // TODO Auto-generated method stub
-
}
@Override
public int getCurrentOutputRowSetNr() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public void setCurrentInputRowSetNr(int index) {
- // TODO Auto-generated method stub
-
}
@Override
diff --git a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java
index ec8d602efb..d1f9255716 100644
--- a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java
+++ b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java
@@ -26,17 +26,15 @@ import java.text.SimpleDateFormat;
import java.util.Date;
/** A base implementation for all output file based metas. */
-public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends JsonOutputData> extends BaseTransformMeta<Main, Data> {
+public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends JsonOutputData>
+ extends BaseTransformMeta<Main, Data> {
/** Flag: add the transformnr in the filename */
- @HopMetadataProperty(
- injectionKeyDescription = "JsonOutput.Injection.INC_TRANSFORMNR_IN_FILENAME"
- )
+ @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.INC_TRANSFORMNR_IN_FILENAME")
protected boolean transformNrInFilename;
/** Flag: add the partition number in the filename */
- @HopMetadataProperty(
- injectionKeyDescription = "JsonOutput.Injection.INC_PARTNR_IN_FILENAME")
+ @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.INC_PARTNR_IN_FILENAME")
protected boolean partNrInFilename;
/** Flag: add the date in the filename */
@@ -131,7 +129,7 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
return partNrInFilename;
}
- public void setPartNrInFilename(boolean partNrInFilename){
+ public void setPartNrInFilename(boolean partNrInFilename) {
this.partNrInFilename = partNrInFilename;
}
@@ -139,7 +137,7 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
return transformNrInFilename;
}
- public void setTransformNrInFilename(boolean transformNrInFilename){
+ public void setTransformNrInFilename(boolean transformNrInFilename) {
this.transformNrInFilename = transformNrInFilename;
}
@@ -202,6 +200,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
transform + "",
getPartPrefix() + part,
split + "",
+ false,
+ "",
+ 0,
now,
false,
showSamples);
@@ -222,6 +223,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
"<transform>",
"<partition>",
"<split>",
+ false,
+ "",
+ 0,
now,
false,
showSamples)
@@ -238,8 +242,20 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
final String copyNr,
final String partitionNr,
final String splitNr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final boolean ziparchive) {
- return buildFilename(variables, copyNr, partitionNr, splitNr, ziparchive, true);
+ return buildFilename(
+ variables,
+ copyNr,
+ partitionNr,
+ splitNr,
+ beamContext,
+ transformId,
+ bundleNr,
+ ziparchive,
+ true);
}
public String buildFilename(
@@ -247,6 +263,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final boolean ziparchive,
final boolean showSamples) {
@@ -259,6 +278,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
transformnr,
partnr,
splitnr,
+ beamContext,
+ transformId,
+ bundleNr,
new Date(),
ziparchive,
showSamples);
@@ -270,6 +292,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final Date date,
final boolean ziparchive,
final boolean showSamples) {
@@ -279,6 +304,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
transformnr,
partnr,
splitnr,
+ beamContext,
+ transformId,
+ bundleNr,
date,
ziparchive,
showSamples,
@@ -291,6 +319,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final Date date,
final boolean ziparchive,
final boolean showSamples,
@@ -302,6 +333,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
transformnr,
partnr,
splitnr,
+ beamContext,
+ transformId,
+ bundleNr,
date,
ziparchive,
showSamples,
@@ -315,6 +349,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final Date date,
final boolean ziparchive,
final boolean showSamples,
@@ -365,6 +402,10 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
retval += "_" + splitnr;
}
+ if (beamContext) {
+ retval += "_" + transformId + "_" + bundleNr;
+ }
+
if ("Zip".equals(meta.getFileCompression())) {
if (ziparchive) {
retval += ".zip";
diff --git a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java
index 662d6b87f6..df4dcc91e9 100644
--- a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java
+++ b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java
@@ -113,7 +113,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
if (data.nrRowsInBloc > 0 && data.nrRow % data.nrRowsInBloc == 0) {
// We can now output an object
- outPutRow(row);
+ outputRow(row);
}
}
}
@@ -165,7 +165,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
if (data.nrRowsInBloc > 0 && data.nrRow % data.nrRowsInBloc == 0) {
// We can now output an object
- outPutRow(row);
+ outputRow(row);
}
}
}
@@ -176,11 +176,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
public boolean processRow() throws HopException {
Object[] r = getRow(); // This also waits for a row to be finished.
if (r == null) {
- // no more input to be expected...
- if (!data.rowsAreSafe) {
- // Let's output the remaining unsafe data
- outPutRow(r);
- }
+ writeJsonToFile();
setOutputDone();
return false;
@@ -220,14 +216,22 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
return true;
}
+ private void writeJsonToFile() throws HopTransformException {
+ // no more input to be expected...
+ if (!data.rowsAreSafe) {
+ // Let's output the remaining unsafe data
+ outputRow(null);
+ }
+ }
+
@SuppressWarnings("unchecked")
- private void outPutRow(Object[] rowData) throws HopTransformException {
+ private void outputRow(Object[] rowData) throws HopTransformException {
// We can now output an object
data.jg = new JSONObject();
data.jg.put(data.realBlocName, data.ja);
String value = data.jg.toJSONString();
- if (data.outputValue && data.outputRowMeta != null) {
+ if (rowData != null && data.outputValue && data.outputRowMeta != null) {
Object[] outputRowData = RowDataUtil.addValueData(rowData, data.inputRowMetaSize, value);
incrementLinesOutput();
putRow(data.outputRowMeta, outputRowData);
@@ -300,7 +304,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
//
if (data.ja.size() > 0) {
try {
- outPutRow(null);
+ outputRow(null);
} catch (Exception e) {
log.logError("Error writing final rows to disk", e);
}
@@ -314,6 +318,25 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
super.dispose();
}
+ @Override
+ public void startBundle() throws HopException {
+ if (!first) {
+ openNewFile();
+ }
+ }
+
+ @Override
+ public void batchComplete() throws HopException {
+ if (!data.isBeamContext()) {
+ writeJsonToFile();
+ }
+ }
+
+ @Override
+ public void finishBundle() throws HopException {
+ writeJsonToFile();
+ }
+
private void createParentFolder(String filename) throws HopTransformException {
if (!meta.isCreateParentFolder()) {
return;
@@ -399,7 +422,15 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
}
public String buildFilename() {
- return meta.buildFilename(variables, getCopy() + "", null, data.splitnr + "", false);
+ return meta.buildFilename(
+ variables,
+ getCopy() + "",
+ null,
+ data.splitnr + "",
+ data.isBeamContext(),
+ log.getLogChannelId(),
+ data.getBeamBundleNr(),
+ false);
}
protected boolean closeFile() {
diff --git a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java
index c56252d3f8..b6b1ad7ac8 100644
--- a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java
+++ b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java
@@ -115,6 +115,7 @@ public class JsonOutputMeta extends BaseFileOutputMeta<JsonOutput, JsonOutputDat
@HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.CREATE_PARENT_FOLDER")
private boolean createParentFolder;
+ @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.DONT_CREATE_AT_START")
private boolean doNotOpenNewFileInit;
public JsonOutputMeta() {
@@ -222,6 +223,7 @@ public class JsonOutputMeta extends BaseFileOutputMeta<JsonOutput, JsonOutputDat
nrRowsInBloc = "1";
operationType = OPERATION_TYPE_WRITE_TO_FILE;
extension = "json";
+ doNotOpenNewFileInit=true;
int nrFields = 0;
for (int i = 0; i < nrFields; i++) {
diff --git a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java
index 755eae0d06..92143d99a9 100644
--- a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java
+++ b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java
@@ -626,6 +626,10 @@ public class SortRows extends BaseTransform<SortRowsMeta, SortRowsData> {
}
}
+ @Override
+ public void startBundle() throws HopException {
+ }
+
/**
* Calling this method will alert the transform that we finished passing records to the transform.
* Specifically for transforms like "Sort Rows" it means that the buffered rows can be sorted and
@@ -633,6 +637,15 @@ public class SortRows extends BaseTransform<SortRowsMeta, SortRowsData> {
*/
@Override
public void batchComplete() throws HopException {
+ if (!data.isBeamContext()) {
+ preSortBeforeFlush();
+ passBuffer();
+ setOutputDone();
+ }
+ }
+
+ @Override
+ public void finishBundle() throws HopException {
preSortBeforeFlush();
passBuffer();
setOutputDone();
diff --git a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java
index b4b804d17f..eb4b8d534f 100644
--- a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java
+++ b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java
@@ -51,7 +51,7 @@ public class SortRowsData extends BaseTransformData implements ITransformData {
public int[] convertKeysToNative;
public boolean convertAnyKeysToNative;
- Comparator<RowTempFile> comparator;
+ Comparator<RowTempFile> comparator;
Comparator<Object[]> rowComparator;
public int freeCounter;
diff --git a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java
index 5ff88b7092..aa08142016 100644
--- a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java
+++ b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java
@@ -769,6 +769,9 @@ public class TextFileOutput<Meta extends TextFileOutputMeta, Data extends TextFi
getCopy(),
getPartitionId(),
data.splitnr,
+ data.isBeamContext(),
+ log.getLogChannelId(),
+ data.getBeamBundleNr(),
ziparchive,
meta);
}
@@ -902,6 +905,7 @@ public class TextFileOutput<Meta extends TextFileOutputMeta, Data extends TextFi
protected void close() throws IOException {
if (!meta.isServletOutput()) {
data.getFileStreamsCollection().flushOpenFiles(true);
+ data.writer = null;
}
}
@@ -1031,4 +1035,28 @@ public class TextFileOutput<Meta extends TextFileOutputMeta, Data extends TextFi
throws HopFileException {
return HopVfs.getOutputStream(vfsFilename, append);
}
+
+ @Override
+ public void startBundle() throws HopException {
+ }
+
+ @Override
+ public void batchComplete() throws HopException {
+ if (!data.isBeamContext()) {
+ try {
+ close();
+ } catch (IOException e) {
+ throw new HopException("Error closing file(s)", e);
+ }
+ }
+ }
+
+ @Override
+ public void finishBundle() throws HopException {
+ try {
+ close();
+ } catch (IOException e) {
+ throw new HopException("Error closing file(s)", e);
+ }
+ }
}
diff --git a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java
index f9d7f9fb08..3d3bb05f4d 100644
--- a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java
+++ b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java
@@ -227,149 +227,207 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
this.servletOutput = servletOutput;
}
- /** @param createparentfolder The createparentfolder to set. */
+ /**
+ * @param createparentfolder The createparentfolder to set.
+ */
public void setCreateParentFolder(boolean createparentfolder) {
this.createparentfolder = createparentfolder;
}
- /** @return Returns the createparentfolder. */
+ /**
+ * @return Returns the createparentfolder.
+ */
public boolean isCreateParentFolder() {
return createparentfolder;
}
- /** @param dateInFilename The dateInFilename to set. */
+ /**
+ * @param dateInFilename The dateInFilename to set.
+ */
public void setDateInFilename(boolean dateInFilename) {
this.dateInFilename = dateInFilename;
}
- /** @return Returns the enclosure. */
+ /**
+ * @return Returns the enclosure.
+ */
public String getEnclosure() {
return enclosure;
}
- /** @param enclosure The enclosure to set. */
+ /**
+ * @param enclosure The enclosure to set.
+ */
public void setEnclosure(String enclosure) {
this.enclosure = enclosure;
}
- /** @return Returns the enclosureForced. */
+ /**
+ * @return Returns the enclosureForced.
+ */
public boolean isEnclosureForced() {
return enclosureForced;
}
- /** @param enclosureForced The enclosureForced to set. */
+ /**
+ * @param enclosureForced The enclosureForced to set.
+ */
public void setEnclosureForced(boolean enclosureForced) {
this.enclosureForced = enclosureForced;
}
- /** @return Returns the enclosureFixDisabled. */
+ /**
+ * @return Returns the enclosureFixDisabled.
+ */
public boolean isEnclosureFixDisabled() {
return disableEnclosureFix;
}
- /** @param disableEnclosureFix The enclosureFixDisabled to set. */
+ /**
+ * @param disableEnclosureFix The enclosureFixDisabled to set.
+ */
public void setEnclosureFixDisabled(boolean disableEnclosureFix) {
this.disableEnclosureFix = disableEnclosureFix;
}
- /** @return Returns the add to result filesname. */
+ /**
+ * @return Returns the add to result filesname.
+ */
public boolean isAddToResultFiles() {
return addToResultFilenames;
}
- /** @param addtoresultfilenamesin The addtoresultfilenames to set. */
+ /**
+ * @param addtoresultfilenamesin The addtoresultfilenames to set.
+ */
public void setAddToResultFiles(boolean addtoresultfilenamesin) {
this.addToResultFilenames = addtoresultfilenamesin;
}
- /** @return Returns the fileAppended. */
+ /**
+ * @return Returns the fileAppended.
+ */
public boolean isFileAppended() {
return fileAppended;
}
- /** @param fileAppended The fileAppended to set. */
+ /**
+ * @param fileAppended The fileAppended to set.
+ */
public void setFileAppended(boolean fileAppended) {
this.fileAppended = fileAppended;
}
- /** @return Returns the fileFormat. */
+ /**
+ * @return Returns the fileFormat.
+ */
public String getFileFormat() {
return fileFormat;
}
- /** @param fileFormat The fileFormat to set. */
+ /**
+ * @param fileFormat The fileFormat to set.
+ */
@Injection(name = "FORMAT")
public void setFileFormat(String fileFormat) {
this.fileFormat = fileFormat;
this.newline = getNewLine(fileFormat);
}
- /** @return Returns the footer. */
+ /**
+ * @return Returns the footer.
+ */
public boolean isFooterEnabled() {
return footerEnabled;
}
- /** @param footer The footer to set. */
+ /**
+ * @param footer The footer to set.
+ */
public void setFooterEnabled(boolean footer) {
this.footerEnabled = footer;
}
- /** @return Returns the header. */
+ /**
+ * @return Returns the header.
+ */
public boolean isHeaderEnabled() {
return headerEnabled;
}
- /** @param header The header to set. */
+ /**
+ * @param header The header to set.
+ */
public void setHeaderEnabled(boolean header) {
this.headerEnabled = header;
}
- /** @return Returns the newline. */
+ /**
+ * @return Returns the newline.
+ */
public String getNewline() {
return newline;
}
- /** @param newline The newline to set. */
+ /**
+ * @param newline The newline to set.
+ */
public void setNewline(String newline) {
this.newline = newline;
}
- /** @return Returns the padded. */
+ /**
+ * @return Returns the padded.
+ */
public boolean isPadded() {
return padded;
}
- /** @param padded The padded to set. */
+ /**
+ * @param padded The padded to set.
+ */
public void setPadded(boolean padded) {
this.padded = padded;
}
- /** @return Returns the fastDump. */
+ /**
+ * @return Returns the fastDump.
+ */
public boolean isFastDump() {
return fastDump;
}
- /** @param fastDump The fastDump to set. */
+ /**
+ * @param fastDump The fastDump to set.
+ */
public void setFastDump(boolean fastDump) {
this.fastDump = fastDump;
}
- /** @return Returns the separator. */
+ /**
+ * @return Returns the separator.
+ */
public String getSeparator() {
return separator;
}
- /** @param separator The separator to set. */
+ /**
+ * @param separator The separator to set.
+ */
public void setSeparator(String separator) {
this.separator = separator;
}
- /** @return Returns the "do not open new file at init" flag. */
+ /**
+ * @return Returns the "do not open new file at init" flag.
+ */
public boolean isDoNotOpenNewFileInit() {
return doNotOpenNewFileInit;
}
- /** @param doNotOpenNewFileInit The "do not open new file at init" flag to set. */
+ /**
+ * @param doNotOpenNewFileInit The "do not open new file at init" flag to set.
+ */
public void setDoNotOpenNewFileInit(boolean doNotOpenNewFileInit) {
this.doNotOpenNewFileInit = doNotOpenNewFileInit;
}
@@ -378,7 +436,7 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
* @return Returns the splitEvery.
* @deprecated use {@link #getSplitEvery(IVariables)} or {@link #getSplitEveryRows()}
*/
- @Deprecated(since="2.0")
+ @Deprecated(since = "2.0")
public int getSplitEvery() {
return Const.toInt(splitEveryRows, 0);
}
@@ -391,17 +449,23 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
return Const.toInt(varSpace == null ? splitEveryRows : varSpace.resolve(splitEveryRows), 0);
}
- /** @return At how many rows to split into a new file. */
+ /**
+ * @return At how many rows to split into a new file.
+ */
public String getSplitEveryRows() {
return splitEveryRows;
}
- /** @param value At how many rows to split into a new file. */
+ /**
+ * @param value At how many rows to split into a new file.
+ */
public void setSplitEveryRows(String value) {
splitEveryRows = value;
}
- /** @return <tt>1</tt> if <tt>isFooterEnabled()</tt> and <tt>0</tt> otherwise */
+ /**
+ * @return <tt>1</tt> if <tt>isFooterEnabled()</tt> and <tt>0</tt> otherwise
+ */
public int getFooterShift() {
return isFooterEnabled() ? 1 : 0;
}
@@ -410,32 +474,42 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
* @param splitEvery The splitEvery to set.
* @deprecated use {@link #setSplitEveryRows(String)}
*/
- @Deprecated(since="2.0")
+ @Deprecated(since = "2.0")
public void setSplitEvery(int splitEvery) {
splitEveryRows = Integer.toString(splitEvery);
}
- /** @param transformNrInFilename The transformNrInFilename to set. */
+ /**
+ * @param transformNrInFilename The transformNrInFilename to set.
+ */
public void setTransformNrInFilename(boolean transformNrInFilename) {
this.transformNrInFilename = transformNrInFilename;
}
- /** @param partNrInFilename The partNrInFilename to set. */
+ /**
+ * @param partNrInFilename The partNrInFilename to set.
+ */
public void setPartNrInFilename(boolean partNrInFilename) {
this.partNrInFilename = partNrInFilename;
}
- /** @param timeInFilename The timeInFilename to set. */
+ /**
+ * @param timeInFilename The timeInFilename to set.
+ */
public void setTimeInFilename(boolean timeInFilename) {
this.timeInFilename = timeInFilename;
}
- /** @return Returns the outputFields. */
+ /**
+ * @return Returns the outputFields.
+ */
public TextFileField[] getOutputFields() {
return outputFields;
}
- /** @param outputFields The outputFields to set. */
+ /**
+ * @param outputFields The outputFields to set.
+ */
public void setOutputFields(TextFileField[] outputFields) {
this.outputFields = outputFields;
}
@@ -456,7 +530,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
this.encoding = encoding;
}
- /** @return The desired last line in the output file, null or empty if nothing has to be added. */
+ /**
+ * @return The desired last line in the output file, null or empty if nothing has to be added.
+ */
public String getEndedLine() {
return endedLine;
}
@@ -469,22 +545,30 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
this.endedLine = endedLine;
}
- /** @return Is the file name coded in a field? */
+ /**
+ * @return Is the file name coded in a field?
+ */
public boolean isFileNameInField() {
return fileNameInField;
}
- /** @param fileNameInField Is the file name coded in a field? */
+ /**
+ * @param fileNameInField Is the file name coded in a field?
+ */
public void setFileNameInField(boolean fileNameInField) {
this.fileNameInField = fileNameInField;
}
- /** @return The field name that contains the output file name. */
+ /**
+ * @return The field name that contains the output file name.
+ */
public String getFileNameField() {
return fileNameField;
}
- /** @param fileNameField Name of the field that contains the file name */
+ /**
+ * @param fileNameField Name of the field that contains the file name
+ */
public void setFileNameField(String fileNameField) {
this.fileNameField = fileNameField;
}
@@ -665,7 +749,7 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
setFileCompression(fileCompressionTypeCodes[FILE_COMPRESSION_TYPE_NONE]);
fileName = "file";
servletOutput = false;
- doNotOpenNewFileInit = false;
+ doNotOpenNewFileInit = true;
extension = "txt";
transformNrInFilename = false;
partNrInFilename = false;
@@ -705,6 +789,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
int transformnr,
String partnr,
int splitnr,
+ boolean beamContext,
+ String transformId,
+ int bundleNr,
boolean ziparchive,
TextFileOutputMeta meta) {
@@ -717,6 +804,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
Integer.toString(transformnr),
partnr,
Integer.toString(splitnr),
+ beamContext,
+ transformId,
+ bundleNr,
new Date(),
ziparchive,
true,
@@ -1084,6 +1174,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
transform + "",
getPartPrefix() + part,
split + "",
+ false,
+ "",
+ 1,
now,
false,
showSamples);
@@ -1104,6 +1197,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
"<transform>",
"<partition>",
"<split>",
+ false,
+ "",
+ 1,
now,
false,
showSamples)
@@ -1115,43 +1211,15 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
return "";
}
- public String buildFilename(
- final IVariables variables,
- final String transformnr,
- final String partnr,
- final String splitnr,
- final boolean ziparchive) {
- return buildFilename(variables, transformnr, partnr, splitnr, ziparchive, true);
- }
-
- public String buildFilename(
- final IVariables variables,
- final String transformnr,
- final String partnr,
- final String splitnr,
- final boolean ziparchive,
- final boolean showSamples) {
-
- String realFileName = variables.resolve(fileName);
- String realExtension = variables.resolve(extension);
-
- return buildFilename(
- realFileName,
- realExtension,
- transformnr,
- partnr,
- splitnr,
- new Date(),
- ziparchive,
- showSamples);
- }
-
private String buildFilename(
final String realFileName,
final String realExtension,
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final Date date,
final boolean ziparchive,
final boolean showSamples) {
@@ -1161,6 +1229,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
transformnr,
partnr,
splitnr,
+ beamContext,
+ transformId,
+ bundleNr,
date,
ziparchive,
showSamples,
@@ -1173,6 +1244,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final Date date,
final boolean ziparchive,
final boolean showSamples,
@@ -1184,6 +1258,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
transformnr,
partnr,
splitnr,
+ beamContext,
+ transformId,
+ bundleNr,
date,
ziparchive,
showSamples,
@@ -1197,6 +1274,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
final String transformnr,
final String partnr,
final String splitnr,
+ final boolean beamContext,
+ final String transformId,
+ final int bundleNr,
final Date date,
final boolean ziparchive,
final boolean showSamples,
@@ -1246,6 +1326,11 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
if (meta.getSplitEvery(variables) > 0) {
retval += "_" + splitnr;
}
+ // In a Beam context, always add the transform ID and bundle number
+ //
+ if (beamContext) {
+ retval += "_" + transformId + "_" + bundleNr;
+ }
if ("Zip".equals(meta.getFileCompression())) {
if (ziparchive) {
diff --git a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java
index b07b4daa0f..92f35c9b3e 100644
--- a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java
+++ b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java
@@ -176,11 +176,11 @@ public class TextFileOutputMetaTest {
meta.setSplitEveryRows("${splitVar}");
IVariables varSpace = new Variables();
assertEquals(0, meta.getSplitEvery(varSpace));
- String fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 3, false, meta);
+ String fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 3, false, null, 0, false, meta);
assertEquals("foo.txt2", fileName);
varSpace.setVariable("splitVar", "2");
assertEquals(2, meta.getSplitEvery(varSpace));
- fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 5, false, meta);
+ fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 5, false, null, 0, false, meta);
assertEquals("foo_5.txt2", fileName);
}
diff --git a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java
index f58f0f8e71..c51d84ad7a 100644
--- a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java
+++ b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java
@@ -371,6 +371,9 @@ public class TextFileOutputTest {
Mockito.anyString(),
Mockito.anyInt(),
Mockito.anyBoolean(),
+ Mockito.anyString(),
+ Mockito.anyInt(),
+ Mockito.anyBoolean(),
Mockito.any(TextFileOutputMeta.class)))
.thenReturn(TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION);
@@ -461,6 +464,9 @@ public class TextFileOutputTest {
Mockito.anyInt(),
Mockito.anyString(),
Mockito.anyInt(),
+ Mockito.anyBoolean(),
+ Mockito.anyString(),
+ Mockito.anyInt(),
Mockito.anyBoolean(),
Mockito.any(TextFileOutputMeta.class)))
.thenReturn(pathToFile);
@@ -642,6 +648,9 @@ public class TextFileOutputTest {
0,
null,
0,
+ false,
+ null,
+ 0,
true,
transformMockHelper.iTransformMeta))
.thenReturn(TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION);
@@ -719,6 +728,9 @@ public class TextFileOutputTest {
0,
null,
0,
+ false,
+ null,
+ 0,
true,
transformMockHelper.iTransformMeta))
.thenReturn(TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION);