You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by mc...@apache.org on 2022/09/22 20:19:05 UTC

[hop] branch master updated: HOP-4172 : Forcing to a single thread in Beam doesn't work (fix, IT)

This is an automated email from the ASF dual-hosted git repository.

mcasters pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/master by this push:
     new b43d852493 HOP-4172 : Forcing to a single thread in Beam doesn't work (fix, IT)
     new 90ba057bde Merge pull request #1704 from mattcasters/master
b43d852493 is described below

commit b43d852493280ebd6ea48086b484ae04d8690cd9
Author: Matt Casters <ma...@gmail.com>
AuthorDate: Thu Sep 22 21:48:27 2022 +0200

    HOP-4172 : Forcing to a single thread in Beam doesn't work (fix, IT)
---
 assemblies/plugins/tech/parquet/pom.xml            |   5 +
 .../plugins/tech/parquet/src/assembly/assembly.xml |   1 +
 core/src/main/java/org/apache/hop/core/Const.java  |   4 +
 .../pipeline/SingleThreadedPipelineExecutor.java   |  27 +-
 .../hop/pipeline/transform/BaseTransform.java      |  10 +-
 .../hop/pipeline/transform/BaseTransformData.java  |  42 ++
 .../apache/hop/pipeline/transform/ITransform.java  |  19 +-
 .../hop/pipeline/transform/ITransformData.java     |  29 ++
 ...pl => 0005-generate-single-file-validation.hpl} | 172 ++++++-
 ...json-file.hpl => 0005-generate-single-file.hpl} | 183 +++++--
 ...l => 0006-generate-bundle-files-validation.hpl} | 165 +++++-
 ...son-file.hpl => 0006-generate-bundle-files.hpl} | 181 +++++--
 .../beam_directrunner/main-0005-single-thread.hwf  |  12 +-
 ...ead.hwf => main-0006-generate-bundle-files.hwf} |  20 +-
 ...0005-generate-single-file-validation UNIT.json} |  65 ++-
 ...006-generate-bundle-files-validation UNIT.json} |  65 ++-
 .../gcp/0007-single-thread-validation.hpl          | 158 +++++-
 integration-tests/gcp/0007-single-thread.hpl       | 212 ++++----
 .../0007-single-thread-validation UNIT.json        |  49 ++
 .../hop/beam/core/transform/TransformBaseFn.java   |  39 ++
 .../core/transform/TransformBatchTransform.java    |  35 --
 .../hop/beam/core/transform/TransformFn.java       | 551 +++++++++++----------
 .../beam/core/transform/TransformTransform.java    |  24 -
 .../HopPipelineMetaToBeamPipelineConverter.java    |   6 +-
 .../apache/hop/neo4j/transforms/cypher/Cypher.java |   4 +-
 plugins/tech/parquet/pom.xml                       |   2 +-
 .../parquet/transforms/output/ParquetOutput.java   |  22 +
 .../excelwriter/ExcelWriterFileField.java          |   2 +-
 .../excelwriter/ExcelWriterTransform.java          | 102 ++--
 .../excelwriter/ExcelWriterTransformData.java      |   1 -
 .../excelwriter/ExcelWriterTransformMeta.java      |  11 +-
 .../excelwriter/messages/messages_en_US.properties |  45 +-
 .../transforms/javascript/ScriptValuesDummy.java   |  47 +-
 .../transforms/jsonoutput/BaseFileOutputMeta.java  |  59 ++-
 .../pipeline/transforms/jsonoutput/JsonOutput.java |  53 +-
 .../transforms/jsonoutput/JsonOutputMeta.java      |   2 +
 .../hop/pipeline/transforms/sort/SortRows.java     |  13 +
 .../hop/pipeline/transforms/sort/SortRowsData.java |   2 +-
 .../transforms/textfileoutput/TextFileOutput.java  |  28 ++
 .../textfileoutput/TextFileOutputMeta.java         | 237 ++++++---
 .../textfileoutput/TextFileOutputMetaTest.java     |   4 +-
 .../textfileoutput/TextFileOutputTest.java         |  12 +
 42 files changed, 1918 insertions(+), 802 deletions(-)

diff --git a/assemblies/plugins/tech/parquet/pom.xml b/assemblies/plugins/tech/parquet/pom.xml
index 386e4ec8e7..7821315428 100644
--- a/assemblies/plugins/tech/parquet/pom.xml
+++ b/assemblies/plugins/tech/parquet/pom.xml
@@ -123,6 +123,11 @@
             <version>${hadoop.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.fasterxml.woodstox</groupId>
+            <artifactId>woodstox-core</artifactId>
+            <version>5.3.0</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/assemblies/plugins/tech/parquet/src/assembly/assembly.xml b/assemblies/plugins/tech/parquet/src/assembly/assembly.xml
index c2c6bc0ebd..2d7ae7ef84 100644
--- a/assemblies/plugins/tech/parquet/src/assembly/assembly.xml
+++ b/assemblies/plugins/tech/parquet/src/assembly/assembly.xml
@@ -51,6 +51,7 @@
             <useProjectArtifact>false</useProjectArtifact>
             <scope>runtime</scope>
             <includes>
+                <include>com.fasterxml.woodstox:woodstox-core</include>
                 <include>com.github.luben:zstd-jni:jar</include>
                 <include>com.google.code.findbugs:jsr305:jar</include>
                 <include>com.google.protobuf:protobuf-java:jar</include>
diff --git a/core/src/main/java/org/apache/hop/core/Const.java b/core/src/main/java/org/apache/hop/core/Const.java
index 9adf5d39f1..aff5930bdb 100644
--- a/core/src/main/java/org/apache/hop/core/Const.java
+++ b/core/src/main/java/org/apache/hop/core/Const.java
@@ -342,6 +342,10 @@ public class Const {
   public static final String INTERNAL_VARIABLE_TRANSFORM_ID =
           INTERNAL_VARIABLE_PREFIX + ".Transform.ID";
 
+  public static final String INTERNAL_VARIABLE_TRANSFORM_BUNDLE_NR =
+          INTERNAL_VARIABLE_PREFIX + ".Transform.BundleNr";
+
+
   public static final String INTERNAL_VARIABLE_ACTION_ID =
           INTERNAL_VARIABLE_PREFIX + ".Action.ID";
 
diff --git a/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java b/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java
index 75109f83fd..420ba9b3b7 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/SingleThreadedPipelineExecutor.java
@@ -387,7 +387,8 @@ public class SingleThreadedPipelineExecutor {
             "An exception was raised during a transform's execution: "
                 + inProcessCombi.transformName);
         this.exceptionsRaisedCounter += 1;
-      } else throw new HopException("Error performing an iteration in a single threaded pipeline", e);
+      } else
+        throw new HopException("Error performing an iteration in a single threaded pipeline", e);
     }
     return nrDone < transforms.size() && !pipeline.isStopped();
   }
@@ -430,12 +431,12 @@ public class SingleThreadedPipelineExecutor {
                 String.valueOf(lu),
                 String.valueOf(e + lj)));
       }
-      ((BaseTransform<?,?>) combi.transform).setLinesInput(0);
-      ((BaseTransform<?,?>) combi.transform).setLinesOutput(0);
-      ((BaseTransform<?,?>) combi.transform).setLinesWritten(0);
-      ((BaseTransform<?,?>) combi.transform).setLinesRead(0);
-      ((BaseTransform<?,?>) combi.transform).setLinesSkipped(0);
-      ((BaseTransform<?,?>) combi.transform).setLinesUpdated(0);
+      ((BaseTransform<?, ?>) combi.transform).setLinesInput(0);
+      ((BaseTransform<?, ?>) combi.transform).setLinesOutput(0);
+      ((BaseTransform<?, ?>) combi.transform).setLinesWritten(0);
+      ((BaseTransform<?, ?>) combi.transform).setLinesRead(0);
+      ((BaseTransform<?, ?>) combi.transform).setLinesSkipped(0);
+      ((BaseTransform<?, ?>) combi.transform).setLinesUpdated(0);
       combi.transform.setLinesRejected(0);
     }
   }
@@ -460,6 +461,18 @@ public class SingleThreadedPipelineExecutor {
     return pipeline.isStopped();
   }
 
+  public void startBundle() throws HopException {
+    for (TransformMetaDataCombi combi : pipeline.getTransforms()) {
+      combi.transform.startBundle();
+    }
+  }
+
+  public void finishBundle() throws HopException {
+    for (TransformMetaDataCombi combi : pipeline.getTransforms()) {
+      combi.transform.finishBundle();
+    }
+  }
+
   public void dispose() throws HopException {
 
     // Call output done.
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
index 8b1d22fe79..25c2ce3884 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
@@ -3679,14 +3679,18 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
     this.containerObjectId = containerObjectId;
   }
 
-  /*
-   * (non-Javadoc)
-   *
+  /**
    * @see org.apache.hop.pipeline.transform.ITransform#batchComplete()
    */
   @Override
   public void batchComplete() throws HopException {}
 
+  @Override
+  public void startBundle() throws HopException {}
+
+  @Override
+  public void finishBundle() throws HopException {}
+
   /**
    * Returns the registration date
    *
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java
index 0f3ac09e44..8518f990ca 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransformData.java
@@ -30,6 +30,12 @@ public abstract class BaseTransformData implements ITransformData {
   /** The status. */
   private ComponentExecutionStatus status;
 
+  /** Set to true if the transform is running in a Beam pipeline */
+  private boolean beamContext;
+
+  /** The Beam bundle number */
+  private int beamBundleNr;
+
   /** Instantiates a new base transform data. */
   public BaseTransformData() {
     status = ComponentExecutionStatus.STATUS_EMPTY;
@@ -123,4 +129,40 @@ public abstract class BaseTransformData implements ITransformData {
   public boolean isDisposed() {
     return status == ComponentExecutionStatus.STATUS_DISPOSED;
   }
+
+  /**
+   * Gets beamContext
+   *
+   * @return value of beamContext
+   */
+  public boolean isBeamContext() {
+    return beamContext;
+  }
+
+  /**
+   * Sets beamContext
+   *
+   * @param beamContext value of beamContext
+   */
+  public void setBeamContext(boolean beamContext) {
+    this.beamContext = beamContext;
+  }
+
+  /**
+   * Gets beamBundleNr
+   *
+   * @return value of beamBundleNr
+   */
+  public int getBeamBundleNr() {
+    return beamBundleNr;
+  }
+
+  /**
+   * Sets beamBundleNr
+   *
+   * @param beamBundleNr value of beamBundleNr
+   */
+  public void setBeamBundleNr(int beamBundleNr) {
+    this.beamBundleNr = beamBundleNr;
+  }
 }
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java
index 4b452a84e0..d1bb0af3ce 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransform.java
@@ -295,14 +295,27 @@ public interface ITransform
   void setRepartitioning(int partitioningMethod);
 
   /**
-   * Calling this method will alert the transform that we finished passing a batch of records to the
-   * transform. Specifically for transforms like "Sort Rows" it means that the buffered rows can be
-   * sorted and passed on.
+   * When using the Single threaded engine this signals to the transform that a batch of records has been processed
+   * and that no more are expected in this batch.
    *
    * @throws HopException In case an error occurs during the processing of the batch of rows.
    */
   void batchComplete() throws HopException;
 
+  /**
+   * When running in a Beam context this signals the transform that a new bundle was started.
+   * File writing transforms might want to create new files here.
+   * @throws HopException
+   */
+  void startBundle() throws HopException;
+
+  /**
+   * When running in a Beam context this signals the transform that a bundle was finished.
+   * File writing transforms will want to close open file(s) here.
+   * @throws HopException
+   */
+  void finishBundle() throws HopException;
+
   /**
    * Pass along the metadata to use when loading external elements at runtime.
    *
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java
index b7d7875e15..7ed71de9c0 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/ITransformData.java
@@ -82,4 +82,33 @@ public interface ITransformData {
    * @return true, if is disposed
    */
   boolean isDisposed();
+
+
+  /**
+   * Gets beamContext
+   *
+   * @return value of beamContext
+   */
+  public boolean isBeamContext();
+
+  /**
+   * Sets beamContext
+   *
+   * @param beamContext value of beamContext
+   */
+  public void setBeamContext(boolean beamContext);
+
+  /**
+   * Gets beamBundleNr
+   *
+   * @return value of beamBundleNr
+   */
+  public int getBeamBundleNr();
+
+  /**
+   * Sets beamBundleNr
+   *
+   * @param beamBundleNr value of beamBundleNr
+   */
+  public void setBeamBundleNr(int beamBundleNr);
 }
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl b/integration-tests/beam_directrunner/0005-generate-single-file-validation.hpl
similarity index 81%
copy from integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
copy to integration-tests/beam_directrunner/0005-generate-single-file-validation.hpl
index 8020f7c124..fd82aa1768 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
+++ b/integration-tests/beam_directrunner/0005-generate-single-file-validation.hpl
@@ -19,7 +19,7 @@ limitations under the License.
 -->
 <pipeline>
   <info>
-    <name>0005-generate-one-json-file-validation</name>
+    <name>0005-generate-single-file-validation</name>
     <name_sync_with_filename>Y</name_sync_with_filename>
     <description/>
     <extended_description/>
@@ -41,23 +41,33 @@ limitations under the License.
   </notepads>
   <order>
     <hop>
-      <from>/tmp/0005/0005-*.json</from>
+      <from>/tmp/0005/*.json</from>
       <to>json</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
-      <from>/tmp/0005/0005-*.csv</from>
+      <from>/tmp/0005/*.csv</from>
       <to>cvs</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
-      <from>/tmp/0005/0005-*.xlsx</from>
+      <from>/tmp/0005/*.xlsx</from>
       <to>xlsx</to>
       <enabled>Y</enabled>
     </hop>
+    <hop>
+      <from>/tmp/0005/*.parquet</from>
+      <to>read Parquet file</to>
+      <enabled>Y</enabled>
+    </hop>
+    <hop>
+      <from>read Parquet file</from>
+      <to>parquet</to>
+      <enabled>Y</enabled>
+    </hop>
   </order>
   <transform>
-    <name>/tmp/0005/0005-*.csv</name>
+    <name>/tmp/0005/*.csv</name>
     <type>TextFileInput2</type>
     <description/>
     <distribute>Y</distribute>
@@ -96,7 +106,7 @@ limitations under the License.
     <add_to_result_filenames>Y</add_to_result_filenames>
     <file>
       <name>${java.io.tmpdir}/0005/</name>
-      <filemask>0005-.*\.csv</filemask>
+      <filemask>.*\.csv</filemask>
       <exclude_filemask/>
       <file_required>N</file_required>
       <include_subfolders>N</include_subfolders>
@@ -284,12 +294,12 @@ limitations under the License.
     <sizeFieldName/>
     <attributes/>
     <GUI>
-      <xloc>160</xloc>
-      <yloc>144</yloc>
+      <xloc>144</xloc>
+      <yloc>128</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>/tmp/0005/0005-*.json</name>
+    <name>/tmp/0005/*.json</name>
     <type>JsonInput</type>
     <description/>
     <distribute>Y</distribute>
@@ -463,12 +473,45 @@ limitations under the License.
     <sizeFieldName/>
     <attributes/>
     <GUI>
-      <xloc>160</xloc>
-      <yloc>64</yloc>
+      <xloc>144</xloc>
+      <yloc>48</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>/tmp/0005/*.parquet</name>
+    <type>GetFileNames</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <isaddresult>Y</isaddresult>
+    <doNotFailIfNoFile>N</doNotFailIfNoFile>
+    <dynamic_include_subfolders>N</dynamic_include_subfolders>
+    <filefield>N</filefield>
+    <file>
+      <filemask>.*\.parquet</filemask>
+      <name>${java.io.tmpdir}/0005/</name>
+      <file_required>N</file_required>
+      <include_subfolders>N</include_subfolders>
+    </file>
+    <filter>
+      <filterfiletype>all_files</filterfiletype>
+    </filter>
+    <rownum>N</rownum>
+    <raiseAnExceptionIfNoFile>N</raiseAnExceptionIfNoFile>
+    <limit>0</limit>
+    <attributes/>
+    <GUI>
+      <xloc>144</xloc>
+      <yloc>288</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>/tmp/0005/0005-*.xlsx</name>
+    <name>/tmp/0005/*.xlsx</name>
     <type>ExcelInput</type>
     <description/>
     <distribute>Y</distribute>
@@ -495,7 +538,7 @@ limitations under the License.
     <accept_transform_name/>
     <file>
       <name>${java.io.tmpdir}/0005/</name>
-      <filemask>0005-.*\.xlsx</filemask>
+      <filemask>.*\.xlsx</filemask>
       <exclude_filemask/>
       <file_required>N</file_required>
       <include_subfolders>N</include_subfolders>
@@ -649,8 +692,8 @@ limitations under the License.
     <spreadsheet_type>POI</spreadsheet_type>
     <attributes/>
     <GUI>
-      <xloc>160</xloc>
-      <yloc>224</yloc>
+      <xloc>144</xloc>
+      <yloc>208</yloc>
     </GUI>
   </transform>
   <transform>
@@ -666,8 +709,8 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>352</xloc>
-      <yloc>144</yloc>
+      <xloc>512</xloc>
+      <yloc>128</yloc>
     </GUI>
   </transform>
   <transform>
@@ -683,8 +726,95 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>352</xloc>
-      <yloc>64</yloc>
+      <xloc>512</xloc>
+      <yloc>48</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>parquet</name>
+    <type>Dummy</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <attributes/>
+    <GUI>
+      <xloc>512</xloc>
+      <yloc>288</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>read Parquet file</name>
+    <type>ParquetFileInput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <source_field>id</source_field>
+        <target_field>id</target_field>
+        <target_type>Integer</target_type>
+      </field>
+      <field>
+        <source_field>lastName</source_field>
+        <target_field>lastName</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>firstName</source_field>
+        <target_field>firstName</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>zipCode</source_field>
+        <target_field>zipCode</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>city</source_field>
+        <target_field>city</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>birthdate</source_field>
+        <target_field>birthdate</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>street</source_field>
+        <target_field>street</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>housenr</source_field>
+        <target_field>housenr</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>stateCode</source_field>
+        <target_field>stateCode</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>state</source_field>
+        <target_field>state</target_field>
+        <target_type>String</target_type>
+      </field>
+    </fields>
+    <filename_field>filename</filename_field>
+    <attributes/>
+    <GUI>
+      <xloc>320</xloc>
+      <yloc>288</yloc>
     </GUI>
   </transform>
   <transform>
@@ -700,8 +830,8 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>352</xloc>
-      <yloc>224</yloc>
+      <xloc>512</xloc>
+      <yloc>208</yloc>
     </GUI>
   </transform>
   <transform_error_handling>
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl b/integration-tests/beam_directrunner/0005-generate-single-file.hpl
similarity index 74%
copy from integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
copy to integration-tests/beam_directrunner/0005-generate-single-file.hpl
index 0dcd3e3641..d337409555 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
+++ b/integration-tests/beam_directrunner/0005-generate-single-file.hpl
@@ -19,7 +19,7 @@ limitations under the License.
 -->
 <pipeline>
   <info>
-    <name>0005-generate-one-json-file</name>
+    <name>0005-generate-single-file</name>
     <name_sync_with_filename>Y</name_sync_with_filename>
     <description/>
     <extended_description/>
@@ -41,33 +41,79 @@ limitations under the License.
   </notepads>
   <order>
     <hop>
-      <from>input/customers-noheader-1k.txt</from>
+      <from>parking/customers-noheader-1k.txt</from>
       <to>FL and CA</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
       <from>FL and CA</from>
-      <to>0005-customers-ca-fl.json</to>
+      <to>customers-ca-fl.json</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
       <from>FL and CA</from>
-      <to>0005-customers-ca-fl.csv</to>
+      <to>customers-ca-fl.csv</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
       <from>FL and CA</from>
-      <to>0007-customers-ca-fl.xlsx</to>
+      <to>customers-ca-fl.xlsx</to>
+      <enabled>Y</enabled>
+    </hop>
+    <hop>
+      <from>FL and CA</from>
+      <to>customers-fl-ca.parquet</to>
       <enabled>Y</enabled>
     </hop>
   </order>
   <transform>
-    <name>0005-customers-ca-fl.csv</name>
+    <name>FL and CA</name>
+    <type>FilterRows</type>
+    <description/>
+    <distribute>N</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <send_true_to/>
+    <send_false_to/>
+    <compare>
+      <condition>
+        <negated>N</negated>
+        <conditions>
+          <condition>
+            <negated>N</negated>
+            <leftvalue>stateCode</leftvalue>
+            <function>IN LIST</function>
+            <rightvalue/>
+            <value>
+              <name>constant</name>
+              <type>String</type>
+              <text>FL;CA</text>
+              <length>-1</length>
+              <precision>-1</precision>
+              <isnull>N</isnull>
+              <mask/>
+            </value>
+          </condition>
+        </conditions>
+      </condition>
+    </compare>
+    <attributes/>
+    <GUI>
+      <xloc>336</xloc>
+      <yloc>64</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>customers-ca-fl.csv</name>
     <type>TextFileOutput</type>
     <description/>
     <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>BEAM_SINGLE</copies>
+    <copies>SINGLE_BEAM</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
@@ -86,9 +132,9 @@ limitations under the License.
     <fileNameField/>
     <create_parent_folder>Y</create_parent_folder>
     <file>
-      <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+      <name>${java.io.tmpdir}/0005/customers-ca-fl</name>
       <servlet_output>N</servlet_output>
-      <do_not_open_new_file_init>N</do_not_open_new_file_init>
+      <do_not_open_new_file_init>Y</do_not_open_new_file_init>
       <extention>csv</extention>
       <append>N</append>
       <split>N</split>
@@ -226,17 +272,17 @@ limitations under the License.
     </fields>
     <attributes/>
     <GUI>
-      <xloc>592</xloc>
-      <yloc>224</yloc>
+      <xloc>720</xloc>
+      <yloc>112</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>0005-customers-ca-fl.json</name>
+    <name>customers-ca-fl.json</name>
     <type>JsonOutput</type>
     <description/>
     <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>BEAM_SINGLE</copies>
+    <copies>SINGLE_BEAM</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
@@ -244,11 +290,12 @@ limitations under the License.
     <addToResult>N</addToResult>
     <createParentFolder>Y</createParentFolder>
     <dateInFilename>N</dateInFilename>
+    <doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
     <encoding>UTF-8</encoding>
     <extension>json</extension>
     <fileAppended>N</fileAppended>
     <fileAsCommand>N</fileAsCommand>
-    <fileName>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}.json</fileName>
+    <fileName>${java.io.tmpdir}/0005/customers-ca-fl</fileName>
     <jsonBloc>customers</jsonBloc>
     <nrRowsInBloc>5000</nrRowsInBloc>
     <operation_type>writetofile</operation_type>
@@ -301,17 +348,17 @@ limitations under the License.
     <transformNrInFilename>N</transformNrInFilename>
     <attributes/>
     <GUI>
-      <xloc>592</xloc>
-      <yloc>96</yloc>
+      <xloc>624</xloc>
+      <yloc>144</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>0007-customers-ca-fl.xlsx</name>
+    <name>customers-ca-fl.xlsx</name>
     <type>TypeExitExcelWriterTransform</type>
     <description/>
     <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>SINGLE_BEAM</copies>
+    <copies>BEAM_SINGLE</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
@@ -325,9 +372,9 @@ limitations under the License.
       <autosizecolums>Y</autosizecolums>
       <createParentFolder>Y</createParentFolder>
       <add_date>N</add_date>
-      <do_not_open_newfile_init>N</do_not_open_newfile_init>
+      <do_not_open_newfile_init>Y</do_not_open_newfile_init>
       <extension>xlsx</extension>
-      <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+      <name>${java.io.tmpdir}/0005/customers-ca-fl</name>
       <filename_in_field>N</filename_in_field>
       <if_file_exists>new</if_file_exists>
       <if_sheet_exists>new</if_sheet_exists>
@@ -417,48 +464,86 @@ limitations under the License.
     </template>
     <attributes/>
     <GUI>
-      <xloc>592</xloc>
-      <yloc>352</yloc>
+      <xloc>816</xloc>
+      <yloc>64</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>FL and CA</name>
-    <type>FilterRows</type>
+    <name>customers-fl-ca.parquet</name>
+    <type>ParquetFileOutput</type>
     <description/>
-    <distribute>N</distribute>
+    <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>1</copies>
+    <copies>SINGLE_BEAM</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
     </partitioning>
-    <send_true_to/>
-    <send_false_to/>
-    <compare>
-      <condition>
-        <negated>N</negated>
-        <leftvalue>stateCode</leftvalue>
-        <function>IN LIST</function>
-        <rightvalue/>
-        <value>
-          <name>constant</name>
-          <type>String</type>
-          <text>FL;CA</text>
-          <length>-1</length>
-          <precision>-1</precision>
-          <isnull>N</isnull>
-          <mask/>
-        </value>
-      </condition>
-    </compare>
+    <compression_codec>UNCOMPRESSED</compression_codec>
+    <data_page_size>1048576</data_page_size>
+    <dictionary_page_size>1048576</dictionary_page_size>
+    <fields>
+      <field>
+        <source_field>id</source_field>
+        <target_field>id</target_field>
+      </field>
+      <field>
+        <source_field>Last name</source_field>
+        <target_field>lastName</target_field>
+      </field>
+      <field>
+        <source_field>First name</source_field>
+        <target_field>firstName</target_field>
+      </field>
+      <field>
+        <source_field>cust_zip_code</source_field>
+        <target_field>zipCode</target_field>
+      </field>
+      <field>
+        <source_field>city</source_field>
+        <target_field>city</target_field>
+      </field>
+      <field>
+        <source_field>birthdate</source_field>
+        <target_field>birthdate</target_field>
+      </field>
+      <field>
+        <source_field>street</source_field>
+        <target_field>street</target_field>
+      </field>
+      <field>
+        <source_field>housenr</source_field>
+        <target_field>housenr</target_field>
+      </field>
+      <field>
+        <source_field>stateCode</source_field>
+        <target_field>stateCode</target_field>
+      </field>
+      <field>
+        <source_field>state</source_field>
+        <target_field>state</target_field>
+      </field>
+    </fields>
+    <filename_split_size>1000000</filename_split_size>
+    <filename_base>${java.io.tmpdir}/0005/customers-fl-ca</filename_base>
+    <filename_create_parent_folders>Y</filename_create_parent_folders>
+    <filename_datetime_format>yyyyMMdd-HHmmss</filename_datetime_format>
+    <filename_ext>parquet</filename_ext>
+    <filename_include_copy>N</filename_include_copy>
+    <filename_include_date>N</filename_include_date>
+    <filename_include_datetime>N</filename_include_datetime>
+    <filename_include_split>Y</filename_include_split>
+    <filename_include_time>N</filename_include_time>
+    <row_group_size>20000</row_group_size>
+    <version>2.0</version>
     <attributes/>
     <GUI>
-      <xloc>336</xloc>
-      <yloc>224</yloc>
+      <xloc>512</xloc>
+      <yloc>176</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>input/customers-noheader-1k.txt</name>
+    <name>parking/customers-noheader-1k.txt</name>
     <type>BeamInput</type>
     <description/>
     <distribute>Y</distribute>
@@ -473,7 +558,7 @@ limitations under the License.
     <attributes/>
     <GUI>
       <xloc>144</xloc>
-      <yloc>224</yloc>
+      <yloc>64</yloc>
     </GUI>
   </transform>
   <transform_error_handling>
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl b/integration-tests/beam_directrunner/0006-generate-bundle-files-validation.hpl
similarity index 81%
rename from integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
rename to integration-tests/beam_directrunner/0006-generate-bundle-files-validation.hpl
index 8020f7c124..d9dbed5b3d 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file-validation.hpl
+++ b/integration-tests/beam_directrunner/0006-generate-bundle-files-validation.hpl
@@ -19,7 +19,7 @@ limitations under the License.
 -->
 <pipeline>
   <info>
-    <name>0005-generate-one-json-file-validation</name>
+    <name>0006-generate-bundle-files-validation</name>
     <name_sync_with_filename>Y</name_sync_with_filename>
     <description/>
     <extended_description/>
@@ -41,23 +41,33 @@ limitations under the License.
   </notepads>
   <order>
     <hop>
-      <from>/tmp/0005/0005-*.json</from>
+      <from>/tmp/0006/*.json</from>
       <to>json</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
-      <from>/tmp/0005/0005-*.csv</from>
+      <from>/tmp/0006/*.csv</from>
       <to>cvs</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
-      <from>/tmp/0005/0005-*.xlsx</from>
+      <from>/tmp/0006/*.xlsx</from>
       <to>xlsx</to>
       <enabled>Y</enabled>
     </hop>
+    <hop>
+      <from>/tmp/0006/*.parquet</from>
+      <to>read Parquet file</to>
+      <enabled>Y</enabled>
+    </hop>
+    <hop>
+      <from>read Parquet file</from>
+      <to>parquet</to>
+      <enabled>Y</enabled>
+    </hop>
   </order>
   <transform>
-    <name>/tmp/0005/0005-*.csv</name>
+    <name>/tmp/0006/*.csv</name>
     <type>TextFileInput2</type>
     <description/>
     <distribute>Y</distribute>
@@ -95,8 +105,8 @@ limitations under the License.
     <length>Characters</length>
     <add_to_result_filenames>Y</add_to_result_filenames>
     <file>
-      <name>${java.io.tmpdir}/0005/</name>
-      <filemask>0005-.*\.csv</filemask>
+      <name>${java.io.tmpdir}/0006/</name>
+      <filemask>.*\.csv</filemask>
       <exclude_filemask/>
       <file_required>N</file_required>
       <include_subfolders>N</include_subfolders>
@@ -289,7 +299,7 @@ limitations under the License.
     </GUI>
   </transform>
   <transform>
-    <name>/tmp/0005/0005-*.json</name>
+    <name>/tmp/0006/*.json</name>
     <type>JsonInput</type>
     <description/>
     <distribute>Y</distribute>
@@ -311,7 +321,7 @@ limitations under the License.
     <defaultPathLeafToNull>Y</defaultPathLeafToNull>
     <rownum_field/>
     <file>
-      <name>${java.io.tmpdir}/0005/</name>
+      <name>${java.io.tmpdir}/0006/</name>
       <filemask>.*\.json</filemask>
       <exclude_filemask/>
       <file_required>N</file_required>
@@ -468,7 +478,7 @@ limitations under the License.
     </GUI>
   </transform>
   <transform>
-    <name>/tmp/0005/0005-*.xlsx</name>
+    <name>/tmp/0006/*.xlsx</name>
     <type>ExcelInput</type>
     <description/>
     <distribute>Y</distribute>
@@ -494,8 +504,8 @@ limitations under the License.
     <accept_field/>
     <accept_transform_name/>
     <file>
-      <name>${java.io.tmpdir}/0005/</name>
-      <filemask>0005-.*\.xlsx</filemask>
+      <name>${java.io.tmpdir}/0006/</name>
+      <filemask>.*\.xlsx</filemask>
       <exclude_filemask/>
       <file_required>N</file_required>
       <include_subfolders>N</include_subfolders>
@@ -653,6 +663,44 @@ limitations under the License.
       <yloc>224</yloc>
     </GUI>
   </transform>
+  <transform>
+    <name>/tmp/0006/*.parquet</name>
+    <type>GetFileNames</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <isaddresult>Y</isaddresult>
+    <doNotFailIfNoFile>N</doNotFailIfNoFile>
+    <exclude_wildcard_Field/>
+    <filename_Field/>
+    <dynamic_include_subfolders>N</dynamic_include_subfolders>
+    <wildcard_Field/>
+    <filefield>N</filefield>
+    <file>
+      <exclude_filemask/>
+      <filemask>.*\.parquet</filemask>
+      <name>${java.io.tmpdir}/0006/</name>
+      <file_required>N</file_required>
+      <include_subfolders>N</include_subfolders>
+    </file>
+    <filter>
+      <filterfiletype>all_files</filterfiletype>
+    </filter>
+    <rownum>N</rownum>
+    <raiseAnExceptionIfNoFile>N</raiseAnExceptionIfNoFile>
+    <limit>0</limit>
+    <rownum_field/>
+    <attributes/>
+    <GUI>
+      <xloc>160</xloc>
+      <yloc>304</yloc>
+    </GUI>
+  </transform>
   <transform>
     <name>cvs</name>
     <type>Dummy</type>
@@ -666,7 +714,7 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>352</xloc>
+      <xloc>528</xloc>
       <yloc>144</yloc>
     </GUI>
   </transform>
@@ -683,10 +731,97 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>352</xloc>
+      <xloc>528</xloc>
       <yloc>64</yloc>
     </GUI>
   </transform>
+  <transform>
+    <name>parquet</name>
+    <type>Dummy</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <attributes/>
+    <GUI>
+      <xloc>528</xloc>
+      <yloc>304</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>read Parquet file</name>
+    <type>ParquetFileInput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <source_field>id</source_field>
+        <target_field>id</target_field>
+        <target_type>Integer</target_type>
+      </field>
+      <field>
+        <source_field>lastName</source_field>
+        <target_field>lastName</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>firstName</source_field>
+        <target_field>firstName</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>zipCode</source_field>
+        <target_field>zipCode</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>city</source_field>
+        <target_field>city</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>birthdate</source_field>
+        <target_field>birthdate</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>street</source_field>
+        <target_field>street</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>housenr</source_field>
+        <target_field>housenr</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>stateCode</source_field>
+        <target_field>stateCode</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>state</source_field>
+        <target_field>state</target_field>
+        <target_type>String</target_type>
+      </field>
+    </fields>
+    <filename_field>filename</filename_field>
+    <attributes/>
+    <GUI>
+      <xloc>336</xloc>
+      <yloc>304</yloc>
+    </GUI>
+  </transform>
   <transform>
     <name>xlsx</name>
     <type>Dummy</type>
@@ -700,7 +835,7 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>352</xloc>
+      <xloc>528</xloc>
       <yloc>224</yloc>
     </GUI>
   </transform>
diff --git a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl b/integration-tests/beam_directrunner/0006-generate-bundle-files.hpl
similarity index 74%
rename from integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
rename to integration-tests/beam_directrunner/0006-generate-bundle-files.hpl
index 0dcd3e3641..f2f8ef86e3 100644
--- a/integration-tests/beam_directrunner/0005-generate-one-json-file.hpl
+++ b/integration-tests/beam_directrunner/0006-generate-bundle-files.hpl
@@ -19,7 +19,7 @@ limitations under the License.
 -->
 <pipeline>
   <info>
-    <name>0005-generate-one-json-file</name>
+    <name>0006-generate-bundle-files</name>
     <name_sync_with_filename>Y</name_sync_with_filename>
     <description/>
     <extended_description/>
@@ -41,33 +41,79 @@ limitations under the License.
   </notepads>
   <order>
     <hop>
-      <from>input/customers-noheader-1k.txt</from>
+      <from>parking/customers-noheader-1k.txt</from>
       <to>FL and CA</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
       <from>FL and CA</from>
-      <to>0005-customers-ca-fl.json</to>
+      <to>customers-ca-fl.json</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
       <from>FL and CA</from>
-      <to>0005-customers-ca-fl.csv</to>
+      <to>customers-ca-fl.csv</to>
       <enabled>Y</enabled>
     </hop>
     <hop>
       <from>FL and CA</from>
-      <to>0007-customers-ca-fl.xlsx</to>
+      <to>customers-ca-fl.xlsx</to>
+      <enabled>Y</enabled>
+    </hop>
+    <hop>
+      <from>FL and CA</from>
+      <to>customers-fl-ca.parquet</to>
       <enabled>Y</enabled>
     </hop>
   </order>
   <transform>
-    <name>0005-customers-ca-fl.csv</name>
+    <name>FL and CA</name>
+    <type>FilterRows</type>
+    <description/>
+    <distribute>N</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <send_true_to/>
+    <send_false_to/>
+    <compare>
+      <condition>
+        <negated>N</negated>
+        <conditions>
+          <condition>
+            <negated>N</negated>
+            <leftvalue>stateCode</leftvalue>
+            <function>IN LIST</function>
+            <rightvalue/>
+            <value>
+              <name>constant</name>
+              <type>String</type>
+              <text>FL;CA</text>
+              <length>-1</length>
+              <precision>-1</precision>
+              <isnull>N</isnull>
+              <mask/>
+            </value>
+          </condition>
+        </conditions>
+      </condition>
+    </compare>
+    <attributes/>
+    <GUI>
+      <xloc>336</xloc>
+      <yloc>64</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>customers-ca-fl.csv</name>
     <type>TextFileOutput</type>
     <description/>
     <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>BEAM_SINGLE</copies>
+    <copies>1</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
@@ -86,9 +132,9 @@ limitations under the License.
     <fileNameField/>
     <create_parent_folder>Y</create_parent_folder>
     <file>
-      <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+      <name>${java.io.tmpdir}/0006/customers-ca-fl</name>
       <servlet_output>N</servlet_output>
-      <do_not_open_new_file_init>N</do_not_open_new_file_init>
+      <do_not_open_new_file_init>Y</do_not_open_new_file_init>
       <extention>csv</extention>
       <append>N</append>
       <split>N</split>
@@ -226,17 +272,17 @@ limitations under the License.
     </fields>
     <attributes/>
     <GUI>
-      <xloc>592</xloc>
-      <yloc>224</yloc>
+      <xloc>752</xloc>
+      <yloc>96</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>0005-customers-ca-fl.json</name>
+    <name>customers-ca-fl.json</name>
     <type>JsonOutput</type>
     <description/>
     <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>BEAM_SINGLE</copies>
+    <copies>1</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
@@ -244,11 +290,12 @@ limitations under the License.
     <addToResult>N</addToResult>
     <createParentFolder>Y</createParentFolder>
     <dateInFilename>N</dateInFilename>
+    <doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
     <encoding>UTF-8</encoding>
     <extension>json</extension>
     <fileAppended>N</fileAppended>
     <fileAsCommand>N</fileAsCommand>
-    <fileName>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}.json</fileName>
+    <fileName>${java.io.tmpdir}/0006/customers-ca-fl</fileName>
     <jsonBloc>customers</jsonBloc>
     <nrRowsInBloc>5000</nrRowsInBloc>
     <operation_type>writetofile</operation_type>
@@ -301,17 +348,17 @@ limitations under the License.
     <transformNrInFilename>N</transformNrInFilename>
     <attributes/>
     <GUI>
-      <xloc>592</xloc>
-      <yloc>96</yloc>
+      <xloc>624</xloc>
+      <yloc>128</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>0007-customers-ca-fl.xlsx</name>
+    <name>customers-ca-fl.xlsx</name>
     <type>TypeExitExcelWriterTransform</type>
     <description/>
     <distribute>Y</distribute>
     <custom_distribution/>
-    <copies>SINGLE_BEAM</copies>
+    <copies>1</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
@@ -325,9 +372,9 @@ limitations under the License.
       <autosizecolums>Y</autosizecolums>
       <createParentFolder>Y</createParentFolder>
       <add_date>N</add_date>
-      <do_not_open_newfile_init>N</do_not_open_newfile_init>
+      <do_not_open_newfile_init>Y</do_not_open_newfile_init>
       <extension>xlsx</extension>
-      <name>${java.io.tmpdir}/0005/0005-customers-ca-fl-${Internal.Transform.ID}</name>
+      <name>${java.io.tmpdir}/0006/customers-ca-fl</name>
       <filename_in_field>N</filename_in_field>
       <if_file_exists>new</if_file_exists>
       <if_sheet_exists>new</if_sheet_exists>
@@ -417,48 +464,86 @@ limitations under the License.
     </template>
     <attributes/>
     <GUI>
-      <xloc>592</xloc>
-      <yloc>352</yloc>
+      <xloc>864</xloc>
+      <yloc>64</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>FL and CA</name>
-    <type>FilterRows</type>
+    <name>customers-fl-ca.parquet</name>
+    <type>ParquetFileOutput</type>
     <description/>
-    <distribute>N</distribute>
+    <distribute>Y</distribute>
     <custom_distribution/>
     <copies>1</copies>
     <partitioning>
       <method>none</method>
       <schema_name/>
     </partitioning>
-    <send_true_to/>
-    <send_false_to/>
-    <compare>
-      <condition>
-        <negated>N</negated>
-        <leftvalue>stateCode</leftvalue>
-        <function>IN LIST</function>
-        <rightvalue/>
-        <value>
-          <name>constant</name>
-          <type>String</type>
-          <text>FL;CA</text>
-          <length>-1</length>
-          <precision>-1</precision>
-          <isnull>N</isnull>
-          <mask/>
-        </value>
-      </condition>
-    </compare>
+    <compression_codec>UNCOMPRESSED</compression_codec>
+    <data_page_size>1048576</data_page_size>
+    <dictionary_page_size>1048576</dictionary_page_size>
+    <fields>
+      <field>
+        <source_field>id</source_field>
+        <target_field>id</target_field>
+      </field>
+      <field>
+        <source_field>Last name</source_field>
+        <target_field>lastName</target_field>
+      </field>
+      <field>
+        <source_field>First name</source_field>
+        <target_field>firstName</target_field>
+      </field>
+      <field>
+        <source_field>cust_zip_code</source_field>
+        <target_field>zipCode</target_field>
+      </field>
+      <field>
+        <source_field>city</source_field>
+        <target_field>city</target_field>
+      </field>
+      <field>
+        <source_field>birthdate</source_field>
+        <target_field>birthdate</target_field>
+      </field>
+      <field>
+        <source_field>street</source_field>
+        <target_field>street</target_field>
+      </field>
+      <field>
+        <source_field>housenr</source_field>
+        <target_field>housenr</target_field>
+      </field>
+      <field>
+        <source_field>stateCode</source_field>
+        <target_field>stateCode</target_field>
+      </field>
+      <field>
+        <source_field>state</source_field>
+        <target_field>state</target_field>
+      </field>
+    </fields>
+    <filename_split_size>1000000</filename_split_size>
+    <filename_base>${java.io.tmpdir}/0006/customers-fl-ca</filename_base>
+    <filename_create_parent_folders>Y</filename_create_parent_folders>
+    <filename_datetime_format>yyyyMMdd-HHmmss</filename_datetime_format>
+    <filename_ext>parquet</filename_ext>
+    <filename_include_copy>N</filename_include_copy>
+    <filename_include_date>N</filename_include_date>
+    <filename_include_datetime>N</filename_include_datetime>
+    <filename_include_split>Y</filename_include_split>
+    <filename_include_time>N</filename_include_time>
+    <row_group_size>20000</row_group_size>
+    <version>2.0</version>
     <attributes/>
     <GUI>
-      <xloc>336</xloc>
-      <yloc>224</yloc>
+      <xloc>512</xloc>
+      <yloc>160</yloc>
     </GUI>
   </transform>
   <transform>
-    <name>input/customers-noheader-1k.txt</name>
+    <name>parking/customers-noheader-1k.txt</name>
     <type>BeamInput</type>
     <description/>
     <distribute>Y</distribute>
@@ -473,7 +558,7 @@ limitations under the License.
     <attributes/>
     <GUI>
       <xloc>144</xloc>
-      <yloc>224</yloc>
+      <yloc>64</yloc>
     </GUI>
   </transform>
   <transform_error_handling>
diff --git a/integration-tests/beam_directrunner/main-0005-single-thread.hwf b/integration-tests/beam_directrunner/main-0005-single-thread.hwf
index 4e16a2336d..db94e1a5f9 100644
--- a/integration-tests/beam_directrunner/main-0005-single-thread.hwf
+++ b/integration-tests/beam_directrunner/main-0005-single-thread.hwf
@@ -44,7 +44,7 @@ limitations under the License.
       <weekDay>1</weekDay>
       <DayOfMonth>1</DayOfMonth>
       <parallel>N</parallel>
-      <xloc>144</xloc>
+      <xloc>112</xloc>
       <yloc>96</yloc>
       <attributes_hac/>
     </action>
@@ -73,7 +73,7 @@ limitations under the License.
       <attributes/>
       <test_names>
         <test_name>
-          <name>0005-generate-one-json-file-validation UNIT</name>
+          <name>0005-generate-single-file-validation UNIT</name>
         </test_name>
       </test_names>
       <parallel>N</parallel>
@@ -82,11 +82,11 @@ limitations under the License.
       <attributes_hac/>
     </action>
     <action>
-      <name>0005-generate-one-json-file</name>
+      <name>0005-generate-single-file.hpl</name>
       <description/>
       <type>PIPELINE</type>
       <attributes/>
-      <filename>${PROJECT_HOME}/0005-generate-one-json-file.hpl</filename>
+      <filename>${PROJECT_HOME}/0005-generate-single-file.hpl</filename>
       <params_from_previous>N</params_from_previous>
       <exec_per_row>N</exec_per_row>
       <clear_rows>N</clear_rows>
@@ -120,13 +120,13 @@ limitations under the License.
     </hop>
     <hop>
       <from>delete /tmp/0005/*</from>
-      <to>0005-generate-one-json-file</to>
+      <to>0005-generate-single-file.hpl</to>
       <enabled>Y</enabled>
       <evaluation>Y</evaluation>
       <unconditional>N</unconditional>
     </hop>
     <hop>
-      <from>0005-generate-one-json-file</from>
+      <from>0005-generate-single-file.hpl</from>
       <to>Run Pipeline Unit Tests</to>
       <enabled>Y</enabled>
       <evaluation>Y</evaluation>
diff --git a/integration-tests/beam_directrunner/main-0005-single-thread.hwf b/integration-tests/beam_directrunner/main-0006-generate-bundle-files.hwf
similarity index 88%
copy from integration-tests/beam_directrunner/main-0005-single-thread.hwf
copy to integration-tests/beam_directrunner/main-0006-generate-bundle-files.hwf
index 4e16a2336d..d69f4fa3b6 100644
--- a/integration-tests/beam_directrunner/main-0005-single-thread.hwf
+++ b/integration-tests/beam_directrunner/main-0006-generate-bundle-files.hwf
@@ -18,7 +18,7 @@ limitations under the License.
 
 -->
 <workflow>
-  <name>main-0005-single-thread</name>
+  <name>main-0006-generate-bundle-files</name>
   <name_sync_with_filename>Y</name_sync_with_filename>
   <description/>
   <extended_description/>
@@ -49,7 +49,7 @@ limitations under the License.
       <attributes_hac/>
     </action>
     <action>
-      <name>delete /tmp/0005/*</name>
+      <name>delete /tmp/0006/*</name>
       <description/>
       <type>DELETE_FILES</type>
       <attributes/>
@@ -57,7 +57,7 @@ limitations under the License.
       <include_subfolders>N</include_subfolders>
       <fields>
         <field>
-          <name>${java.io.tmpdir}/0005/</name>
+          <name>${java.io.tmpdir}/0006/</name>
           <filemask>.*</filemask>
         </field>
       </fields>
@@ -73,7 +73,7 @@ limitations under the License.
       <attributes/>
       <test_names>
         <test_name>
-          <name>0005-generate-one-json-file-validation UNIT</name>
+          <name>0006-generate-bundle-files-validation UNIT</name>
         </test_name>
       </test_names>
       <parallel>N</parallel>
@@ -82,11 +82,11 @@ limitations under the License.
       <attributes_hac/>
     </action>
     <action>
-      <name>0005-generate-one-json-file</name>
+      <name>0006-generate-bundle-files.hpl</name>
       <description/>
       <type>PIPELINE</type>
       <attributes/>
-      <filename>${PROJECT_HOME}/0005-generate-one-json-file.hpl</filename>
+      <filename>${PROJECT_HOME}/0006-generate-bundle-files.hpl</filename>
       <params_from_previous>N</params_from_previous>
       <exec_per_row>N</exec_per_row>
       <clear_rows>N</clear_rows>
@@ -113,20 +113,20 @@ limitations under the License.
   <hops>
     <hop>
       <from>Start</from>
-      <to>delete /tmp/0005/*</to>
+      <to>delete /tmp/0006/*</to>
       <enabled>Y</enabled>
       <evaluation>Y</evaluation>
       <unconditional>Y</unconditional>
     </hop>
     <hop>
-      <from>delete /tmp/0005/*</from>
-      <to>0005-generate-one-json-file</to>
+      <from>delete /tmp/0006/*</from>
+      <to>0006-generate-bundle-files.hpl</to>
       <enabled>Y</enabled>
       <evaluation>Y</evaluation>
       <unconditional>N</unconditional>
     </hop>
     <hop>
-      <from>0005-generate-one-json-file</from>
+      <from>0006-generate-bundle-files.hpl</from>
       <to>Run Pipeline Unit Tests</to>
       <enabled>Y</enabled>
       <evaluation>Y</evaluation>
diff --git a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json b/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-single-file-validation UNIT.json
similarity index 74%
copy from integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json
copy to integration-tests/beam_directrunner/metadata/unit-test/0005-generate-single-file-validation UNIT.json
index e807d41644..b53bbb4463 100644
--- a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json	
+++ b/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-single-file-validation UNIT.json	
@@ -60,11 +60,11 @@
           "data_set_field": "id"
         },
         {
-          "transform_field": "Last name",
+          "transform_field": "Last_name",
           "data_set_field": "Last name"
         },
         {
-          "transform_field": "First name",
+          "transform_field": "First_name",
           "data_set_field": "First name"
         },
         {
@@ -99,7 +99,7 @@
       "field_order": [
         "id"
       ],
-      "transform_name": "xlsx",
+      "transform_name": "cvs",
       "data_set_name": "0005-generate-one-json-file-golden"
     },
     {
@@ -109,11 +109,11 @@
           "data_set_field": "id"
         },
         {
-          "transform_field": "Last_name",
+          "transform_field": "Last name",
           "data_set_field": "Last name"
         },
         {
-          "transform_field": "First_name",
+          "transform_field": "First name",
           "data_set_field": "First name"
         },
         {
@@ -148,15 +148,64 @@
       "field_order": [
         "id"
       ],
-      "transform_name": "cvs",
+      "transform_name": "xlsx",
+      "data_set_name": "0005-generate-one-json-file-golden"
+    },
+    {
+      "field_mappings": [
+        {
+          "transform_field": "lastName",
+          "data_set_field": "Last name"
+        },
+        {
+          "transform_field": "firstName",
+          "data_set_field": "First name"
+        },
+        {
+          "transform_field": "birthdate",
+          "data_set_field": "birthdate"
+        },
+        {
+          "transform_field": "zipCode",
+          "data_set_field": "cust_zip_code"
+        },
+        {
+          "transform_field": "city",
+          "data_set_field": "city"
+        },
+        {
+          "transform_field": "id",
+          "data_set_field": "id"
+        },
+        {
+          "transform_field": "street",
+          "data_set_field": "street"
+        },
+        {
+          "transform_field": "housenr",
+          "data_set_field": "housenr"
+        },
+        {
+          "transform_field": "stateCode",
+          "data_set_field": "stateCode"
+        },
+        {
+          "transform_field": "state",
+          "data_set_field": "state"
+        }
+      ],
+      "field_order": [
+        "id"
+      ],
+      "transform_name": "parquet",
       "data_set_name": "0005-generate-one-json-file-golden"
     }
   ],
   "input_data_sets": [],
-  "name": "0005-generate-one-json-file-validation UNIT",
+  "name": "0005-generate-single-file-validation UNIT",
   "description": "",
   "trans_test_tweaks": [],
   "persist_filename": "",
-  "pipeline_filename": "./0005-generate-one-json-file-validation.hpl",
+  "pipeline_filename": "./0005-generate-single-file-validation.hpl",
   "test_type": "UNIT_TEST"
 }
\ No newline at end of file
diff --git a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json b/integration-tests/beam_directrunner/metadata/unit-test/0006-generate-bundle-files-validation UNIT.json
similarity index 74%
rename from integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json
rename to integration-tests/beam_directrunner/metadata/unit-test/0006-generate-bundle-files-validation UNIT.json
index e807d41644..e680a896eb 100644
--- a/integration-tests/beam_directrunner/metadata/unit-test/0005-generate-one-json-file-validation UNIT.json	
+++ b/integration-tests/beam_directrunner/metadata/unit-test/0006-generate-bundle-files-validation UNIT.json	
@@ -60,11 +60,11 @@
           "data_set_field": "id"
         },
         {
-          "transform_field": "Last name",
+          "transform_field": "Last_name",
           "data_set_field": "Last name"
         },
         {
-          "transform_field": "First name",
+          "transform_field": "First_name",
           "data_set_field": "First name"
         },
         {
@@ -99,7 +99,7 @@
       "field_order": [
         "id"
       ],
-      "transform_name": "xlsx",
+      "transform_name": "cvs",
       "data_set_name": "0005-generate-one-json-file-golden"
     },
     {
@@ -109,11 +109,11 @@
           "data_set_field": "id"
         },
         {
-          "transform_field": "Last_name",
+          "transform_field": "Last name",
           "data_set_field": "Last name"
         },
         {
-          "transform_field": "First_name",
+          "transform_field": "First name",
           "data_set_field": "First name"
         },
         {
@@ -148,15 +148,64 @@
       "field_order": [
         "id"
       ],
-      "transform_name": "cvs",
+      "transform_name": "xlsx",
+      "data_set_name": "0005-generate-one-json-file-golden"
+    },
+    {
+      "field_mappings": [
+        {
+          "transform_field": "id",
+          "data_set_field": "id"
+        },
+        {
+          "transform_field": "firstName",
+          "data_set_field": "First name"
+        },
+        {
+          "transform_field": "lastName",
+          "data_set_field": "Last name"
+        },
+        {
+          "transform_field": "zipCode",
+          "data_set_field": "cust_zip_code"
+        },
+        {
+          "transform_field": "city",
+          "data_set_field": "city"
+        },
+        {
+          "transform_field": "birthdate",
+          "data_set_field": "birthdate"
+        },
+        {
+          "transform_field": "street",
+          "data_set_field": "street"
+        },
+        {
+          "transform_field": "housenr",
+          "data_set_field": "housenr"
+        },
+        {
+          "transform_field": "stateCode",
+          "data_set_field": "stateCode"
+        },
+        {
+          "transform_field": "state",
+          "data_set_field": "state"
+        }
+      ],
+      "field_order": [
+        "id"
+      ],
+      "transform_name": "parquet",
       "data_set_name": "0005-generate-one-json-file-golden"
     }
   ],
   "input_data_sets": [],
-  "name": "0005-generate-one-json-file-validation UNIT",
+  "name": "0006-generate-bundle-files-validation UNIT",
   "description": "",
   "trans_test_tweaks": [],
   "persist_filename": "",
-  "pipeline_filename": "./0005-generate-one-json-file-validation.hpl",
+  "pipeline_filename": "./0006-generate-bundle-files-validation.hpl",
   "test_type": "UNIT_TEST"
 }
\ No newline at end of file
diff --git a/integration-tests/gcp/0007-single-thread-validation.hpl b/integration-tests/gcp/0007-single-thread-validation.hpl
index 7c3c86dab6..aa6b4ec2eb 100644
--- a/integration-tests/gcp/0007-single-thread-validation.hpl
+++ b/integration-tests/gcp/0007-single-thread-validation.hpl
@@ -55,6 +55,16 @@ limitations under the License.
       <to>xlsx</to>
       <enabled>Y</enabled>
     </hop>
+    <hop>
+      <from>gs://apache-hop-it/output/0007-*.parquet</from>
+      <to>read Parquet file</to>
+      <enabled>Y</enabled>
+    </hop>
+    <hop>
+      <from>read Parquet file</from>
+      <to>parquet</to>
+      <enabled>Y</enabled>
+    </hop>
   </order>
   <transform>
     <name>csv</name>
@@ -69,8 +79,8 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>384</xloc>
-      <yloc>192</yloc>
+      <xloc>544</xloc>
+      <yloc>112</yloc>
     </GUI>
   </transform>
   <transform>
@@ -125,8 +135,8 @@ limitations under the License.
     <fields>
       <field>
         <name>id</name>
-        <type>Integer</type>
-        <format>0</format>
+        <type>String</type>
+        <format/>
         <currency/>
         <decimal/>
         <group/>
@@ -135,7 +145,7 @@ limitations under the License.
         <position>-1</position>
         <length>15</length>
         <precision>0</precision>
-        <trim_type>none</trim_type>
+        <trim_type>both</trim_type>
         <repeat>N</repeat>
       </field>
       <field>
@@ -301,8 +311,8 @@ limitations under the License.
     <sizeFieldName/>
     <attributes/>
     <GUI>
-      <xloc>192</xloc>
-      <yloc>192</yloc>
+      <xloc>176</xloc>
+      <yloc>112</yloc>
     </GUI>
   </transform>
   <transform>
@@ -480,8 +490,41 @@ limitations under the License.
     <sizeFieldName/>
     <attributes/>
     <GUI>
-      <xloc>192</xloc>
-      <yloc>96</yloc>
+      <xloc>176</xloc>
+      <yloc>32</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>gs://apache-hop-it/output/0007-*.parquet</name>
+    <type>GetFileNames</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <isaddresult>Y</isaddresult>
+    <doNotFailIfNoFile>N</doNotFailIfNoFile>
+    <dynamic_include_subfolders>N</dynamic_include_subfolders>
+    <filefield>N</filefield>
+    <file>
+      <filemask>0007-.*\.parquet</filemask>
+      <name>gs://apache-hop-it/output/</name>
+      <file_required>N</file_required>
+      <include_subfolders>N</include_subfolders>
+    </file>
+    <filter>
+      <filterfiletype>all_files</filterfiletype>
+    </filter>
+    <rownum>N</rownum>
+    <raiseAnExceptionIfNoFile>N</raiseAnExceptionIfNoFile>
+    <limit>0</limit>
+    <attributes/>
+    <GUI>
+      <xloc>176</xloc>
+      <yloc>272</yloc>
     </GUI>
   </transform>
   <transform>
@@ -666,8 +709,8 @@ limitations under the License.
     <spreadsheet_type>SAX_POI</spreadsheet_type>
     <attributes/>
     <GUI>
-      <xloc>192</xloc>
-      <yloc>288</yloc>
+      <xloc>176</xloc>
+      <yloc>192</yloc>
     </GUI>
   </transform>
   <transform>
@@ -682,9 +725,96 @@ limitations under the License.
       <schema_name/>
     </partitioning>
     <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>32</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>parquet</name>
+    <type>Dummy</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <attributes/>
+    <GUI>
+      <xloc>544</xloc>
+      <yloc>272</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>read Parquet file</name>
+    <type>ParquetFileInput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>1</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <fields>
+      <field>
+        <source_field>id</source_field>
+        <target_field>id</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>lastName</source_field>
+        <target_field>lastName</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>firstName</source_field>
+        <target_field>firstName</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>zipCode</source_field>
+        <target_field>zipCode</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>city</source_field>
+        <target_field>city</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>birthdate</source_field>
+        <target_field>birthdate</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>street</source_field>
+        <target_field>street</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>housenr</source_field>
+        <target_field>housenr</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>stateCode</source_field>
+        <target_field>stateCode</target_field>
+        <target_type>String</target_type>
+      </field>
+      <field>
+        <source_field>state</source_field>
+        <target_field>state</target_field>
+        <target_type>String</target_type>
+      </field>
+    </fields>
+    <filename_field>filename</filename_field>
+    <attributes/>
     <GUI>
       <xloc>384</xloc>
-      <yloc>96</yloc>
+      <yloc>272</yloc>
     </GUI>
   </transform>
   <transform>
@@ -700,8 +830,8 @@ limitations under the License.
     </partitioning>
     <attributes/>
     <GUI>
-      <xloc>384</xloc>
-      <yloc>288</yloc>
+      <xloc>544</xloc>
+      <yloc>192</yloc>
     </GUI>
   </transform>
   <transform_error_handling>
diff --git a/integration-tests/gcp/0007-single-thread.hpl b/integration-tests/gcp/0007-single-thread.hpl
index 9a470d760d..89e9112dfe 100644
--- a/integration-tests/gcp/0007-single-thread.hpl
+++ b/integration-tests/gcp/0007-single-thread.hpl
@@ -38,6 +38,28 @@ limitations under the License.
     <is_key_private>N</is_key_private>
   </info>
   <notepads>
+    <notepad>
+      <note>There is a google storage bug somewhere.
+Reading back a newly created xlsx file fails for some reason.
+</note>
+      <xloc>112</xloc>
+      <yloc>352</yloc>
+      <width>359</width>
+      <heigth>64</heigth>
+      <fontname>Noto Sans</fontname>
+      <fontsize>12</fontsize>
+      <fontbold>N</fontbold>
+      <fontitalic>N</fontitalic>
+      <fontcolorred>200</fontcolorred>
+      <fontcolorgreen>231</fontcolorgreen>
+      <fontcolorblue>250</fontcolorblue>
+      <backgroundcolorred>15</backgroundcolorred>
+      <backgroundcolorgreen>136</backgroundcolorgreen>
+      <backgroundcolorblue>210</backgroundcolorblue>
+      <bordercolorred>200</bordercolorred>
+      <bordercolorgreen>231</bordercolorgreen>
+      <bordercolorblue>250</bordercolorblue>
+    </notepad>
   </notepads>
   <order>
     <hop>
@@ -58,6 +80,11 @@ limitations under the License.
     <hop>
       <from>FL,CA</from>
       <to>0007-customers-ca-fl.xlsx</to>
+      <enabled>N</enabled>
+    </hop>
+    <hop>
+      <from>FL,CA</from>
+      <to>0007-customers-fl-ca.parquet</to>
       <enabled>Y</enabled>
     </hop>
   </order>
@@ -86,9 +113,9 @@ limitations under the License.
     <fileNameField/>
     <create_parent_folder>Y</create_parent_folder>
     <file>
-      <name>gs://apache-hop-it/output/0007-customers-ca-fl-${Internal.Transform.ID}</name>
+      <name>gs://apache-hop-it/output/0007-customers-ca-fl</name>
       <servlet_output>N</servlet_output>
-      <do_not_open_new_file_init>N</do_not_open_new_file_init>
+      <do_not_open_new_file_init>Y</do_not_open_new_file_init>
       <extention>csv</extention>
       <append>N</append>
       <split>N</split>
@@ -111,7 +138,7 @@ limitations under the License.
         <decimal>.</decimal>
         <group>,</group>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -123,7 +150,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -135,7 +162,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -147,7 +174,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -159,7 +186,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -171,7 +198,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -183,7 +210,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -195,7 +222,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -207,7 +234,7 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
@@ -219,15 +246,15 @@ limitations under the License.
         <decimal/>
         <group/>
         <nullif/>
-        <trim_type>both</trim_type>
+        <trim_type>none</trim_type>
         <length>-1</length>
         <precision>-1</precision>
       </field>
     </fields>
     <attributes/>
     <GUI>
-      <xloc>576</xloc>
-      <yloc>240</yloc>
+      <xloc>720</xloc>
+      <yloc>160</yloc>
     </GUI>
   </transform>
   <transform>
@@ -244,11 +271,12 @@ limitations under the License.
     <addToResult>N</addToResult>
     <createParentFolder>N</createParentFolder>
     <dateInFilename>N</dateInFilename>
+    <doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
     <encoding>UTF-8</encoding>
     <extension>json</extension>
     <fileAppended>N</fileAppended>
     <fileAsCommand>N</fileAsCommand>
-    <fileName>gs://apache-hop-it/output/0007-customers-ca-fl-${Internal.Transform.ID}.json</fileName>
+    <fileName>gs://apache-hop-it/output/0007-customers-ca-fl</fileName>
     <jsonBloc>customers</jsonBloc>
     <nrRowsInBloc>50000</nrRowsInBloc>
     <operation_type>writetofile</operation_type>
@@ -301,8 +329,8 @@ limitations under the License.
     <transformNrInFilename>N</transformNrInFilename>
     <attributes/>
     <GUI>
-      <xloc>576</xloc>
-      <yloc>112</yloc>
+      <xloc>816</xloc>
+      <yloc>96</yloc>
     </GUI>
   </transform>
   <transform>
@@ -325,16 +353,12 @@ limitations under the License.
       <autosizecolums>Y</autosizecolums>
       <createParentFolder>N</createParentFolder>
       <add_date>N</add_date>
-      <date_time_format/>
-      <do_not_open_newfile_init>N</do_not_open_newfile_init>
+      <do_not_open_newfile_init>Y</do_not_open_newfile_init>
       <extension>xlsx</extension>
-      <name>gs://apache-hop-it/output/0007-customers-ca-fl-${Internal.Transform.ID}</name>
-      <filename_field/>
+      <name>gs://apache-hop-it/output/0007-customers-ca-fl</name>
       <filename_in_field>N</filename_in_field>
       <if_file_exists>new</if_file_exists>
       <if_sheet_exists>new</if_sheet_exists>
-      <password/>
-      <protected_by/>
       <protect_sheet>N</protect_sheet>
       <sheetname>CA-FL-Customers</sheetname>
       <SpecifyFormat>N</SpecifyFormat>
@@ -350,123 +374,64 @@ limitations under the License.
     <makeSheetActive>Y</makeSheetActive>
     <fields>
       <field>
-        <commentAuthorField/>
-        <commentField/>
         <format>0</format>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>id</name>
-        <styleCell/>
         <title>id</title>
-        <titleStyleCell/>
         <type>Integer</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>lastName</name>
-        <styleCell/>
         <title>lastName</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>firstName</name>
-        <styleCell/>
         <title>firstName</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>zipCode</name>
-        <styleCell/>
         <title>zipCode</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>city</name>
-        <styleCell/>
         <title>city</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>birthdate</name>
-        <styleCell/>
         <title>birthdate</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>street</name>
-        <styleCell/>
         <title>street</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>housenr</name>
-        <styleCell/>
         <title>housenr</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>stateCode</name>
-        <styleCell/>
         <title>stateCode</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
       <field>
-        <commentAuthorField/>
-        <commentField/>
-        <format/>
         <formula>N</formula>
-        <hyperlinkField/>
         <name>state</name>
-        <styleCell/>
         <title>state</title>
-        <titleStyleCell/>
         <type>String</type>
       </field>
     </fields>
@@ -477,12 +442,85 @@ limitations under the License.
       <filename>template.xls</filename>
       <sheet_enabled>N</sheet_enabled>
       <hidden>N</hidden>
-      <sheetname/>
     </template>
     <attributes/>
     <GUI>
-      <xloc>576</xloc>
-      <yloc>368</yloc>
+      <xloc>608</xloc>
+      <yloc>208</yloc>
+    </GUI>
+  </transform>
+  <transform>
+    <name>0007-customers-fl-ca.parquet</name>
+    <type>ParquetFileOutput</type>
+    <description/>
+    <distribute>Y</distribute>
+    <custom_distribution/>
+    <copies>SINGLE_BEAM</copies>
+    <partitioning>
+      <method>none</method>
+      <schema_name/>
+    </partitioning>
+    <compression_codec>UNCOMPRESSED</compression_codec>
+    <data_page_size>1048576</data_page_size>
+    <dictionary_page_size>1048576</dictionary_page_size>
+    <fields>
+      <field>
+        <source_field>id</source_field>
+        <target_field>id</target_field>
+      </field>
+      <field>
+        <source_field>lastName</source_field>
+        <target_field>lastName</target_field>
+      </field>
+      <field>
+        <source_field>firstName</source_field>
+        <target_field>firstName</target_field>
+      </field>
+      <field>
+        <source_field>zipCode</source_field>
+        <target_field>zipCode</target_field>
+      </field>
+      <field>
+        <source_field>city</source_field>
+        <target_field>city</target_field>
+      </field>
+      <field>
+        <source_field>birthdate</source_field>
+        <target_field>birthdate</target_field>
+      </field>
+      <field>
+        <source_field>street</source_field>
+        <target_field>street</target_field>
+      </field>
+      <field>
+        <source_field>housenr</source_field>
+        <target_field>housenr</target_field>
+      </field>
+      <field>
+        <source_field>stateCode</source_field>
+        <target_field>stateCode</target_field>
+      </field>
+      <field>
+        <source_field>state</source_field>
+        <target_field>state</target_field>
+      </field>
+    </fields>
+    <filename_split_size>1000000</filename_split_size>
+    <filename_base>gs://apache-hop-it/output/0007-customers-fl-ca</filename_base>
+    <filename_create_parent_folders>Y</filename_create_parent_folders>
+    <filename_datetime_format>yyyyMMdd-HHmmss</filename_datetime_format>
+    <filename_ext>parquet</filename_ext>
+    <filename_include_copy>N</filename_include_copy>
+    <filename_include_date>N</filename_include_date>
+    <filename_include_datetime>N</filename_include_datetime>
+    <filename_include_split>Y</filename_include_split>
+    <filename_include_time>N</filename_include_time>
+    <row_group_size>20000</row_group_size>
+    <version>2.0</version>
+    <attributes/>
+    <GUI>
+      <xloc>496</xloc>
+      <yloc>256</yloc>
     </GUI>
   </transform>
   <transform>
@@ -518,7 +556,7 @@ limitations under the License.
     <attributes/>
     <GUI>
       <xloc>304</xloc>
-      <yloc>240</yloc>
+      <yloc>96</yloc>
     </GUI>
   </transform>
   <transform>
@@ -537,7 +575,7 @@ limitations under the License.
     <attributes/>
     <GUI>
       <xloc>128</xloc>
-      <yloc>240</yloc>
+      <yloc>96</yloc>
     </GUI>
   </transform>
   <transform_error_handling>
diff --git a/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json b/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json
index fb745be473..111fc9e60c 100644
--- a/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json	
+++ b/integration-tests/gcp/metadata/unit-test/0007-single-thread-validation UNIT.json	
@@ -101,6 +101,55 @@
       ],
       "transform_name": "csv",
       "data_set_name": "0007-single-thread-golden"
+    },
+    {
+      "field_mappings": [
+        {
+          "transform_field": "lastName",
+          "data_set_field": "lastName"
+        },
+        {
+          "transform_field": "id",
+          "data_set_field": "id"
+        },
+        {
+          "transform_field": "firstName",
+          "data_set_field": "firstName"
+        },
+        {
+          "transform_field": "zipCode",
+          "data_set_field": "zipCode"
+        },
+        {
+          "transform_field": "city",
+          "data_set_field": "city"
+        },
+        {
+          "transform_field": "birthdate",
+          "data_set_field": "birthdate"
+        },
+        {
+          "transform_field": "street",
+          "data_set_field": "street"
+        },
+        {
+          "transform_field": "housenr",
+          "data_set_field": "housenr"
+        },
+        {
+          "transform_field": "stateCode",
+          "data_set_field": "stateCode"
+        },
+        {
+          "transform_field": "state",
+          "data_set_field": "state"
+        }
+      ],
+      "field_order": [
+        "id"
+      ],
+      "transform_name": "parquet",
+      "data_set_name": "0007-single-thread-golden"
     }
   ],
   "input_data_sets": [],
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java
index dac3e368c6..1cfcecc35c 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBaseFn.java
@@ -21,6 +21,8 @@ package org.apache.hop.beam.core.transform;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hop.beam.core.HopRow;
 import org.apache.hop.core.Const;
@@ -40,6 +42,7 @@ import org.apache.hop.pipeline.config.PipelineRunConfiguration;
 import org.apache.hop.pipeline.transform.ITransform;
 import org.apache.hop.pipeline.transform.RowAdapter;
 import org.apache.hop.pipeline.transform.stream.IStream;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +62,7 @@ public abstract class TransformBaseFn extends DoFn<HopRow, HopRow> {
   protected transient List<IExecutionDataSampler> dataSamplers;
   protected transient List<IExecutionDataSamplerStore> dataSamplerStores;
   protected transient Timer executionInfoTimer;
+  protected transient BoundedWindow batchWindow;
 
   public TransformBaseFn(String parentLogChannelId, String runConfigName, String dataSamplersJson) {
     this.parentLogChannelId = parentLogChannelId;
@@ -187,4 +191,39 @@ public abstract class TransformBaseFn extends DoFn<HopRow, HopRow> {
               Const.toLong(executionInfoLocation.getDataLoggingInterval(), 10000L));
     }
   }
+
+  protected interface TupleOutputContext<T> {
+    void output(TupleTag<T> tupleTag, T output);
+  }
+
+  protected class TransformProcessContext implements TupleOutputContext<HopRow> {
+
+    private DoFn.ProcessContext context;
+
+    public TransformProcessContext(DoFn.ProcessContext processContext) {
+      this.context = processContext;
+    }
+
+    @Override
+    public void output(TupleTag<HopRow> tupleTag, HopRow output) {
+      context.output(tupleTag, output);
+    }
+  }
+
+  protected class TransformFinishBundleContext implements TupleOutputContext<HopRow> {
+
+    private DoFn.FinishBundleContext context;
+    private BoundedWindow batchWindow;
+
+    public TransformFinishBundleContext(
+            DoFn.FinishBundleContext context, BoundedWindow batchWindow) {
+      this.context = context;
+      this.batchWindow = batchWindow;
+    }
+
+    @Override
+    public void output(TupleTag<HopRow> tupleTag, HopRow output) {
+      context.output(tupleTag, output, Instant.now(), batchWindow);
+    }
+  }
 }
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java
index b01f7b44ce..08a7bada95 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformBatchTransform.java
@@ -19,7 +19,6 @@ package org.apache.hop.beam.core.transform;
 
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.*;
@@ -52,7 +51,6 @@ import org.apache.hop.pipeline.transform.*;
 import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
 import org.apache.hop.pipeline.transforms.injector.InjectorField;
 import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
-import org.joda.time.Instant;
 
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -232,7 +230,6 @@ public class TransformBatchTransform extends TransformTransform {
     private transient SingleThreadedPipelineExecutor executor;
 
     private transient Queue<HopRow> rowBuffer;
-    private transient BoundedWindow batchWindow;
 
     private transient AtomicLong lastTimerCheck;
     private transient Timer timer;
@@ -838,38 +835,6 @@ public class TransformBatchTransform extends TransformTransform {
     }
   }
 
-  private interface TupleOutputContext<T> {
-    void output(TupleTag<T> tupleTag, T output);
-  }
-
-  private class TransformProcessContext implements TupleOutputContext<HopRow> {
 
-    private DoFn.ProcessContext context;
-
-    public TransformProcessContext(DoFn.ProcessContext processContext) {
-      this.context = processContext;
-    }
-
-    @Override
-    public void output(TupleTag<HopRow> tupleTag, HopRow output) {
-      context.output(tupleTag, output);
-    }
-  }
 
-  private class TransformFinishBundleContext implements TupleOutputContext<HopRow> {
-
-    private DoFn.FinishBundleContext context;
-    private BoundedWindow batchWindow;
-
-    public TransformFinishBundleContext(
-        DoFn.FinishBundleContext context, BoundedWindow batchWindow) {
-      this.context = context;
-      this.batchWindow = batchWindow;
-    }
-
-    @Override
-    public void output(TupleTag<HopRow> tupleTag, HopRow output) {
-      context.output(tupleTag, output, Instant.now(), batchWindow);
-    }
-  }
 }
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java
index e808351977..55b5b63fdc 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformFn.java
@@ -18,6 +18,7 @@
 
 package org.apache.hop.beam.core.transform;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -50,6 +51,7 @@ import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
 import org.apache.hop.pipeline.transforms.injector.InjectorField;
 import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
 import org.joda.time.Instant;
+import org.json.simple.parser.ParseException;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -139,8 +141,7 @@ public class TransformFn extends TransformBaseFn {
   }
 
   @Setup
-  public void setup() {
-  }
+  public void setup() {}
 
   /**
    * Reset the row buffer every time we start a new bundle to prevent the output of double rows
@@ -149,297 +150,327 @@ public class TransformFn extends TransformBaseFn {
    */
   @StartBundle
   public void startBundle(StartBundleContext startBundleContext) {
-    if ("ScriptValueMod".equals(transformPluginId) && pipeline != null) {
-      // Force re-initialization for this specific transform plugin
-      initialize = true;
+    try {
+      // TODO: create a test to see if this is still needed
+      //
+      //      if ("ScriptValueMod".equals(transformPluginId) && pipeline != null) {
+      //        // Force re-initialization for this specific transform plugin
+      //        initialize = true;
+      //      }
+      //
+
+      // Call start of the bundle on the transform
+      //
+      if (executor != null) {
+        // Increment the bundle number before calling the next startBundle() method in the Hop transforms.
+        //
+        executor
+            .getPipeline()
+            .getTransforms()
+            .forEach(combi -> combi.data.setBeamBundleNr(combi.data.getBeamBundleNr() + 1));
+
+        executor.startBundle();
+      }
+    } catch (HopException e) {
+      throw new RuntimeException("Error at start of bundle!", e);
     }
   }
 
-
   @ProcessElement
   public void processElement(ProcessContext context, BoundedWindow window) {
-
     try {
       if (initialize) {
-        initialize = false;
-        // Initialize Hop and load extra plugins as well
-        //
-        BeamHop.init(transformPluginClasses, xpPluginClasses);
+        initializeTransformPipeline(context);
+      }
 
-        // The content of the metadata is JSON serialized and inflated below.
-        //
-        IHopMetadataProvider metadataProvider = new SerializableMetadataProvider(metastoreJson);
-        IVariables variables = new Variables();
-        for (VariableValue variableValue : variableValues) {
-          if (StringUtils.isNotEmpty(variableValue.getVariable())) {
-            variables.setVariable(variableValue.getVariable(), variableValue.getValue());
-          }
-        }
+      // Get one row from the context main input and make a copy, so we can change it.
+      //
+      HopRow originalInputRow = context.element();
+      HopRow inputRow = HopBeamUtil.copyHopRow(originalInputRow, inputRowMeta);
+      readCounter.inc();
 
-        // Create a very simple new transformation to run single threaded...
-        // Single threaded...
-        //
-        pipelineMeta = new PipelineMeta();
-        pipelineMeta.setName(transformName);
-        pipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
-        pipelineMeta.setMetadataProvider(metadataProvider);
+      emptyRowBuffer(new TransformProcessContext(context), inputRow);
+    } catch (Exception e) {
+      numErrors.inc();
+      LOG.error("Transform execution error :" + e.getMessage());
+      throw new RuntimeException("Error executing TransformFn", e);
+    }
+  }
 
-        // Input row metadata...
-        //
-        inputRowMeta = JsonRowMeta.fromJson(inputRowMetaJson);
-        List<IRowMeta> infoRowMetas = new ArrayList<>();
-        for (String infoRowMetaJson : infoRowMetaJsons) {
-          IRowMeta infoRowMeta = JsonRowMeta.fromJson(infoRowMetaJson);
-          infoRowMetas.add(infoRowMeta);
-        }
+  private void initializeTransformPipeline(DoFn<HopRow, HopRow>.ProcessContext context)
+      throws HopException, ParseException, JsonProcessingException {
+    initialize = false;
+    // Initialize Hop and load extra plugins as well
+    //
+    BeamHop.init(transformPluginClasses, xpPluginClasses);
 
-        // Create an Injector transform with the right row layout...
-        // This will help all transforms see the row layout statically...
-        //
-        TransformMeta mainInjectorTransformMeta = null;
-        if (!inputTransform) {
-          mainInjectorTransformMeta =
-              createInjectorTransform(
-                  pipelineMeta, INJECTOR_TRANSFORM_NAME, inputRowMeta, 200, 200);
-        }
+    // The content of the metadata is JSON serialized and inflated below.
+    //
+    IHopMetadataProvider metadataProvider = new SerializableMetadataProvider(metastoreJson);
+    IVariables variables = new Variables();
+    for (VariableValue variableValue : variableValues) {
+      if (StringUtils.isNotEmpty(variableValue.getVariable())) {
+        variables.setVariable(variableValue.getVariable(), variableValue.getValue());
+      }
+    }
 
-        // Our main transform writes to a bunch of targets
-        // Add a dummy transform for each one so the transform can target them
-        //
-        int targetLocationY = 200;
-        List<TransformMeta> targetTransformMetas = new ArrayList<>();
-        for (String targetTransform : targetTransforms) {
-          DummyMeta dummyMeta = new DummyMeta();
-          TransformMeta targetTransformMeta = new TransformMeta(targetTransform, dummyMeta);
-          targetTransformMeta.setLocation(600, targetLocationY);
-          targetLocationY += 150;
-
-          targetTransformMetas.add(targetTransformMeta);
-          pipelineMeta.addTransform(targetTransformMeta);
-        }
+    // Create a very simple new transformation to run single threaded...
+    // Single threaded...
+    //
+    pipelineMeta = new PipelineMeta();
+    pipelineMeta.setName(transformName);
+    pipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
+    pipelineMeta.setMetadataProvider(metadataProvider);
 
-        // The transform might read information from info transforms
-        // like "Stream Lookup" or "Validator".
-        // They read all the data on input from a side input.
-        //
-        List<List<HopRow>> infoDataSets = new ArrayList<>();
-        List<TransformMeta> infoTransformMetas = new ArrayList<>();
-        for (int i = 0; i < infoTransforms.size(); i++) {
-          String infoTransform = infoTransforms.get(i);
-          PCollectionView<List<HopRow>> cv = infoCollectionViews.get(i);
-
-          // Get the data from the side input, from the info transform(s)
-          //
-          List<HopRow> infoDataSet = context.sideInput(cv);
-          infoDataSets.add(infoDataSet);
-
-          IRowMeta infoRowMeta = infoRowMetas.get(i);
-
-          // Add an Injector transform for every info transform so the transform can read from it
-          //
-          TransformMeta infoTransformMeta =
-              createInjectorTransform(pipelineMeta, infoTransform, infoRowMeta, 200, 350 + 150 * i);
-          infoTransformMetas.add(infoTransformMeta);
-        }
+    // Input row metadata...
+    //
+    inputRowMeta = JsonRowMeta.fromJson(inputRowMetaJson);
+    List<IRowMeta> infoRowMetas = new ArrayList<>();
+    for (String infoRowMetaJson : infoRowMetaJsons) {
+      IRowMeta infoRowMeta = JsonRowMeta.fromJson(infoRowMetaJson);
+      infoRowMetas.add(infoRowMeta);
+    }
 
-        transformCombis = new ArrayList<>();
+    // Create an Injector transform with the right row layout...
+    // This will help all transforms see the row layout statically...
+    //
+    TransformMeta mainInjectorTransformMeta = null;
+    if (!inputTransform) {
+      mainInjectorTransformMeta =
+          createInjectorTransform(pipelineMeta, INJECTOR_TRANSFORM_NAME, inputRowMeta, 200, 200);
+    }
 
-        // The main transform inflated from XML metadata...
-        //
-        PluginRegistry registry = PluginRegistry.getInstance();
-        ITransformMeta iTransformMeta =
-            registry.loadClass(TransformPluginType.class, transformPluginId, ITransformMeta.class);
-        if (iTransformMeta == null) {
-          throw new HopException(
-              "Unable to load transform plugin with ID "
-                  + transformPluginId
-                  + ", this plugin isn't in the plugin registry or classpath");
-        }
+    // Our main transform writes to a bunch of targets
+    // Add a dummy transform for each one so the transform can target them
+    //
+    int targetLocationY = 200;
+    List<TransformMeta> targetTransformMetas = new ArrayList<>();
+    for (String targetTransform : targetTransforms) {
+      DummyMeta dummyMeta = new DummyMeta();
+      TransformMeta targetTransformMeta = new TransformMeta(targetTransform, dummyMeta);
+      targetTransformMeta.setLocation(600, targetLocationY);
+      targetLocationY += 150;
+
+      targetTransformMetas.add(targetTransformMeta);
+      pipelineMeta.addTransform(targetTransformMeta);
+    }
 
-        HopBeamUtil.loadTransformMetadataFromXml(
-            transformName,
-            iTransformMeta,
-            transformMetaInterfaceXml,
-            pipelineMeta.getMetadataProvider());
-
-        transformMeta = new TransformMeta(transformName, iTransformMeta);
-        transformMeta.setTransformPluginId(transformPluginId);
-        transformMeta.setLocation(400, 200);
-        pipelineMeta.addTransform(transformMeta);
-        if (!inputTransform) {
-          pipelineMeta.addPipelineHop(
-              new PipelineHopMeta(mainInjectorTransformMeta, transformMeta));
-        }
-        // The target hops as well
-        //
-        for (TransformMeta targetTransformMeta : targetTransformMetas) {
-          pipelineMeta.addPipelineHop(new PipelineHopMeta(transformMeta, targetTransformMeta));
-        }
+    // The transform might read information from info transforms
+    // like "Stream Lookup" or "Validator".
+    // They read all the data on input from a side input.
+    //
+    List<List<HopRow>> infoDataSets = new ArrayList<>();
+    List<TransformMeta> infoTransformMetas = new ArrayList<>();
+    for (int i = 0; i < infoTransforms.size(); i++) {
+      String infoTransform = infoTransforms.get(i);
+      PCollectionView<List<HopRow>> cv = infoCollectionViews.get(i);
 
-        // And the info hops...
-        //
-        for (TransformMeta infoTransformMeta : infoTransformMetas) {
-          pipelineMeta.addPipelineHop(new PipelineHopMeta(infoTransformMeta, transformMeta));
-        }
+      // Get the data from the side input, from the info transform(s)
+      //
+      List<HopRow> infoDataSet = context.sideInput(cv);
+      infoDataSets.add(infoDataSet);
 
-        // If we are sending execution information to a location, see if we have any extra data
-        // samplers
-        // The data samplers list is composed of those in the data profile along with the set from
-        // the extra ones in the parent pipeline.
-        //
-        lookupExecutionInformation(metadataProvider);
+      IRowMeta infoRowMeta = infoRowMetas.get(i);
+
+      // Add an Injector transform for every info transform so the transform can read from it
+      //
+      TransformMeta infoTransformMeta =
+          createInjectorTransform(pipelineMeta, infoTransform, infoRowMeta, 200, 350 + 150 * i);
+      infoTransformMetas.add(infoTransformMeta);
+    }
 
-        iTransformMeta.searchInfoAndTargetTransforms(pipelineMeta.getTransforms());
+    transformCombis = new ArrayList<>();
 
-        // Create the transformation...
-        //
-        pipeline =
-            new LocalPipelineEngine(
-                pipelineMeta, variables, new LoggingObject("apache-beam-transform"));
-        pipeline.setLogLevel(
-            context.getPipelineOptions().as(HopPipelineExecutionOptions.class).getLogLevel());
-        pipeline.setMetadataProvider(pipelineMeta.getMetadataProvider());
-
-        // Change the name to make the logging less confusing.
-        //
-        pipeline
-            .getPipelineRunConfiguration()
-            .setName("beam-transform-local (" + transformName + ")");
+    // The main transform inflated from XML metadata...
+    //
+    PluginRegistry registry = PluginRegistry.getInstance();
+    ITransformMeta iTransformMeta =
+        registry.loadClass(TransformPluginType.class, transformPluginId, ITransformMeta.class);
+    if (iTransformMeta == null) {
+      throw new HopException(
+          "Unable to load transform plugin with ID "
+              + transformPluginId
+              + ", this plugin isn't in the plugin registry or classpath");
+    }
 
-        pipeline.prepareExecution();
+    HopBeamUtil.loadTransformMetadataFromXml(
+        transformName,
+        iTransformMeta,
+        transformMetaInterfaceXml,
+        pipelineMeta.getMetadataProvider());
 
-        // Create producers so we can efficiently pass data
-        //
-        rowProducer = null;
-        if (!inputTransform) {
-          rowProducer = pipeline.addRowProducer(INJECTOR_TRANSFORM_NAME, 0);
-        }
-        List<RowProducer> infoRowProducers = new ArrayList<>();
-        for (String infoTransform : infoTransforms) {
-          RowProducer infoRowProducer = pipeline.addRowProducer(infoTransform, 0);
-          infoRowProducers.add(infoRowProducer);
-        }
+    transformMeta = new TransformMeta(transformName, iTransformMeta);
+    transformMeta.setTransformPluginId(transformPluginId);
+    transformMeta.setLocation(400, 200);
+    pipelineMeta.addTransform(transformMeta);
+    if (!inputTransform) {
+      pipelineMeta.addPipelineHop(new PipelineHopMeta(mainInjectorTransformMeta, transformMeta));
+    }
+    // The target hops as well
+    //
+    for (TransformMeta targetTransformMeta : targetTransformMetas) {
+      pipelineMeta.addPipelineHop(new PipelineHopMeta(transformMeta, targetTransformMeta));
+    }
 
-        // Find the right interfaces for execution later...
-        //
-        if (!inputTransform) {
-          TransformMetaDataCombi injectorCombi = findCombi(pipeline, INJECTOR_TRANSFORM_NAME);
-          transformCombis.add(injectorCombi);
-        }
+    // And the info hops...
+    //
+    for (TransformMeta infoTransformMeta : infoTransformMetas) {
+      pipelineMeta.addPipelineHop(new PipelineHopMeta(infoTransformMeta, transformMeta));
+    }
 
-        TransformMetaDataCombi transformCombi = findCombi(pipeline, transformName);
-        transformCombis.add(transformCombi);
-
-        if (targetTransforms.isEmpty()) {
-          IRowListener rowListener =
-              new RowAdapter() {
-                @Override
-                public void rowWrittenEvent(IRowMeta rowMeta, Object[] row)
-                    throws HopTransformException {
-                  resultRows.add(new HopRow(row, rowMeta.size()));
-                }
-              };
-          transformCombi.transform.addRowListener(rowListener);
-        }
+    // If we are sending execution information to a location, see if we have any extra data
+    // samplers
+    // The data samplers list is composed of those in the data profile along with the set from
+    // the extra ones in the parent pipeline.
+    //
+    lookupExecutionInformation(metadataProvider);
 
-        // Create a list of TupleTag to direct the target rows
-        //
-        mainTupleTag = new TupleTag<>(HopBeamUtil.createMainOutputTupleId(transformName)) {};
-        tupleTagList = new ArrayList<>();
+    iTransformMeta.searchInfoAndTargetTransforms(pipelineMeta.getTransforms());
 
-        // The lists in here will contain all the rows that ended up in the various target
-        // transforms (if any)
-        //
-        targetResultRowsList = new ArrayList<>();
-
-        for (String targetTransform : targetTransforms) {
-          TransformMetaDataCombi targetCombi = findCombi(pipeline, targetTransform);
-          transformCombis.add(targetCombi);
-
-          String tupleId = HopBeamUtil.createTargetTupleId(transformName, targetTransform);
-          TupleTag<HopRow> tupleTag = new TupleTag<>(tupleId) {};
-          tupleTagList.add(tupleTag);
-          final List<Object[]> targetResultRows = new ArrayList<>();
-          targetResultRowsList.add(targetResultRows);
-
-          targetCombi.transform.addRowListener(
-              new RowAdapter() {
-                @Override
-                public void rowReadEvent(IRowMeta rowMeta, Object[] row)
-                    throws HopTransformException {
-                  // We send the target row to a specific list...
-                  //
-                  targetResultRows.add(row);
-                }
-              });
-        }
+    // Create the transformation...
+    //
+    pipeline =
+        new LocalPipelineEngine(
+            pipelineMeta, variables, new LoggingObject("apache-beam-transform"));
+    pipeline.setLogLevel(
+        context.getPipelineOptions().as(HopPipelineExecutionOptions.class).getLogLevel());
+    pipeline.setMetadataProvider(pipelineMeta.getMetadataProvider());
+
+    // Change the name to make the logging less confusing.
+    //
+    pipeline.getPipelineRunConfiguration().setName("beam-transform-local (" + transformName + ")");
 
-        attachExecutionSamplersToOutput(
-            variables,
-            transformName,
-            pipeline.getLogChannelId(),
-            inputRowMeta,
-            pipelineMeta.getTransformFields(variables, transformName),
-            pipeline.getTransform(transformName, 0));
+    pipeline.prepareExecution();
 
-        executor = new SingleThreadedPipelineExecutor(pipeline);
+    // Indicate that we're dealing with a Beam pipeline during execution
+    // Start counting bundle numbers from 1
+    //
+    pipeline.getTransforms().forEach(c -> {
+      c.data.setBeamContext(true);
+      c.data.setBeamBundleNr(1);
+    });
 
-        // Initialize the transforms...
-        //
-        executor.init();
+    // Create producers so we can efficiently pass data
+    //
+    rowProducer = null;
+    if (!inputTransform) {
+      rowProducer = pipeline.addRowProducer(INJECTOR_TRANSFORM_NAME, 0);
+    }
+    List<RowProducer> infoRowProducers = new ArrayList<>();
+    for (String infoTransform : infoTransforms) {
+      RowProducer infoRowProducer = pipeline.addRowProducer(infoTransform, 0);
+      infoRowProducers.add(infoRowProducer);
+    }
 
-        Counter initCounter = Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName);
-        readCounter = Metrics.counter(Pipeline.METRIC_NAME_READ, transformName);
-        writtenCounter = Metrics.counter(Pipeline.METRIC_NAME_WRITTEN, transformName);
+    // Find the right interfaces for execution later...
+    //
+    if (!inputTransform) {
+      TransformMetaDataCombi injectorCombi = findCombi(pipeline, INJECTOR_TRANSFORM_NAME);
+      transformCombis.add(injectorCombi);
+    }
 
-        initCounter.inc();
+    TransformMetaDataCombi transformCombi = findCombi(pipeline, transformName);
+    transformCombis.add(transformCombi);
+
+    if (targetTransforms.isEmpty()) {
+      IRowListener rowListener =
+          new RowAdapter() {
+            @Override
+            public void rowWrittenEvent(IRowMeta rowMeta, Object[] row)
+                throws HopTransformException {
+              resultRows.add(new HopRow(row, rowMeta.size()));
+            }
+          };
+      transformCombi.transform.addRowListener(rowListener);
+    }
 
-        // Doesn't really start the threads in single threaded mode
-        // Just sets some flags all over the place
-        //
-        pipeline.startThreads();
+    // Create a list of TupleTag to direct the target rows
+    //
+    mainTupleTag = new TupleTag<>(HopBeamUtil.createMainOutputTupleId(transformName)) {};
+    tupleTagList = new ArrayList<>();
 
-        resultRows = new ArrayList<>();
+    // The lists in here will contain all the rows that ended up in the various target
+    // transforms (if any)
+    //
+    targetResultRowsList = new ArrayList<>();
+
+    for (String targetTransform : targetTransforms) {
+      TransformMetaDataCombi targetCombi = findCombi(pipeline, targetTransform);
+      transformCombis.add(targetCombi);
+
+      String tupleId = HopBeamUtil.createTargetTupleId(transformName, targetTransform);
+      TupleTag<HopRow> tupleTag = new TupleTag<>(tupleId) {};
+      tupleTagList.add(tupleTag);
+      final List<Object[]> targetResultRows = new ArrayList<>();
+      targetResultRowsList.add(targetResultRows);
+
+      targetCombi.transform.addRowListener(
+          new RowAdapter() {
+            @Override
+            public void rowReadEvent(IRowMeta rowMeta, Object[] row) throws HopTransformException {
+              // We send the target row to a specific list...
+              //
+              targetResultRows.add(row);
+            }
+          });
+    }
 
-        // Copy the info data sets to the info transforms...
-        // We do this only once so all subsequent rows can use this.
-        //
-        for (int i = 0; i < infoTransforms.size(); i++) {
-          RowProducer infoRowProducer = infoRowProducers.get(i);
-          List<HopRow> infoDataSet = infoDataSets.get(i);
-          TransformMetaDataCombi combi = findCombi(pipeline, infoTransforms.get(i));
-          IRowMeta infoRowMeta = infoRowMetas.get(i);
-
-          // Pass and process the rows in the info transforms
-          //
-          for (HopRow infoRowData : infoDataSet) {
-            infoRowProducer.putRow(infoRowMeta, infoRowData.getRow());
-            combi.transform.processRow();
-          }
-
-          // By calling finished() transforms like Stream Lookup know no more rows are going to
-          // come, and they can start to work with the info data set
-          //
-          infoRowProducer.finished();
-
-          // Call once more to flag input as done, transform as finished.
-          //
-          combi.transform.processRow();
-        }
+    attachExecutionSamplersToOutput(
+        variables,
+        transformName,
+        pipeline.getLogChannelId(),
+        inputRowMeta,
+        pipelineMeta.getTransformFields(variables, transformName),
+        pipeline.getTransform(transformName, 0));
+
+    executor = new SingleThreadedPipelineExecutor(pipeline);
+
+    // Initialize the transforms...
+    //
+    executor.init();
+
+    Counter initCounter = Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName);
+    readCounter = Metrics.counter(Pipeline.METRIC_NAME_READ, transformName);
+    writtenCounter = Metrics.counter(Pipeline.METRIC_NAME_WRITTEN, transformName);
+
+    initCounter.inc();
+
+    // Doesn't really start the threads in single threaded mode
+    // Just sets some flags all over the place
+    //
+    pipeline.startThreads();
+
+    resultRows = new ArrayList<>();
+
+    // Copy the info data sets to the info transforms...
+    // We do this only once so all subsequent rows can use this.
+    //
+    for (int i = 0; i < infoTransforms.size(); i++) {
+      RowProducer infoRowProducer = infoRowProducers.get(i);
+      List<HopRow> infoDataSet = infoDataSets.get(i);
+      TransformMetaDataCombi combi = findCombi(pipeline, infoTransforms.get(i));
+      IRowMeta infoRowMeta = infoRowMetas.get(i);
+
+      // Pass and process the rows in the info transforms
+      //
+      for (HopRow infoRowData : infoDataSet) {
+        infoRowProducer.putRow(infoRowMeta, infoRowData.getRow());
+        combi.transform.processRow();
       }
 
-      // Get one row from the context main input and make a copy, so we can change it.
+      // By calling finished() transforms like Stream Lookup know no more rows are going to
+      // come, and they can start to work with the info data set
       //
-      HopRow originalInputRow = context.element();
-      HopRow inputRow = HopBeamUtil.copyHopRow(originalInputRow, inputRowMeta);
-      readCounter.inc();
+      infoRowProducer.finished();
 
-      emptyRowBuffer(new TransformProcessContext(context), inputRow);
-    } catch (Exception e) {
-      numErrors.inc();
-      LOG.error("Transform execution error :" + e.getMessage());
-      throw new RuntimeException("Error executing TransformFn", e);
+      // Call once more to flag input as done, transform as finished.
+      //
+      combi.transform.processRow();
     }
+
+    // Flag the start of a bundle for the first time.
+    //
+    executor.startBundle();
   }
 
   /**
@@ -568,7 +599,11 @@ public class TransformFn extends TransformBaseFn {
   @FinishBundle
   public void finishBundle(FinishBundleContext context) {
     try {
-
+      // Signal the end of the bundle on the transform
+      //
+      if (executor != null) {
+        executor.finishBundle();
+      }
     } catch (Exception e) {
       numErrors.inc();
       LOG.error("Transform finishing bundle error :" + e.getMessage());
@@ -593,11 +628,9 @@ public class TransformFn extends TransformBaseFn {
       }
     } catch (Exception e) {
       LOG.error(
-          "Error sending row samples to execution info location for transform " + transformName,
-          e);
+          "Error sending row samples to execution info location for transform " + transformName, e);
       throw new RuntimeException(
-          "Error sending row samples to execution info location for transform " + transformName,
-          e);
+          "Error sending row samples to execution info location for transform " + transformName, e);
     }
   }
 }
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java
index 105868a3aa..cb482aa21f 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/transform/TransformTransform.java
@@ -19,37 +19,13 @@ package org.apache.hop.beam.core.transform;
 
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.*;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hop.beam.core.BeamHop;
 import org.apache.hop.beam.core.HopRow;
 import org.apache.hop.beam.core.shared.VariableValue;
 import org.apache.hop.beam.core.util.HopBeamUtil;
-import org.apache.hop.beam.core.util.JsonRowMeta;
-import org.apache.hop.beam.engines.HopPipelineExecutionOptions;
-import org.apache.hop.core.exception.HopException;
-import org.apache.hop.core.exception.HopTransformException;
-import org.apache.hop.core.logging.LogLevel;
-import org.apache.hop.core.logging.LoggingObject;
-import org.apache.hop.core.metadata.SerializableMetadataProvider;
-import org.apache.hop.core.plugins.PluginRegistry;
-import org.apache.hop.core.plugins.TransformPluginType;
-import org.apache.hop.core.row.IRowMeta;
-import org.apache.hop.core.row.IValueMeta;
-import org.apache.hop.core.variables.IVariables;
-import org.apache.hop.core.variables.Variables;
-import org.apache.hop.metadata.api.IHopMetadataProvider;
-import org.apache.hop.pipeline.*;
-import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
-import org.apache.hop.pipeline.transform.*;
-import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
-import org.apache.hop.pipeline.transforms.injector.InjectorField;
-import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
-import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
index a97b42fb44..9b0dce24cb 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
@@ -557,9 +557,9 @@ public class HopPipelineMetaToBeamPipelineConverter {
       throw new HopException(
           "Group By is not supported.  Use the Memory Group By transform instead.  It comes closest to Beam functionality.");
     }
-    if (meta instanceof SortRowsMeta) {
-      throw new HopException("Sort rows is not yet supported on Beam.");
-    }
+//    if (meta instanceof SortRowsMeta) {
+//      throw new HopException("Sort rows is not yet supported on Beam.");
+//    }
     if (meta instanceof UniqueRowsMeta) {
       throw new HopException(
           "The unique rows transform is not yet supported on Beam, for now use a Memory Group By to get distrinct rows");
diff --git a/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java b/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java
index 613c966fe4..71216b6b1c 100644
--- a/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java
+++ b/plugins/tech/neo4j/src/main/java/org/apache/hop/neo4j/transforms/cypher/Cypher.java
@@ -690,13 +690,13 @@ public class Cypher extends BaseTransform<CypherMeta, CypherData> {
   }
 
   @Override
-  public void batchComplete() {
+  public void batchComplete() throws HopException {
     try {
       wrapUpTransaction();
     } catch (Exception e) {
       setErrors(getErrors() + 1);
       stopAll();
-      throw new RuntimeException("Unable to complete batch of records", e);
+      throw new HopException("Unable to complete batch of records", e);
     }
   }
 
diff --git a/plugins/tech/parquet/pom.xml b/plugins/tech/parquet/pom.xml
index 5c4bc8632e..4bce93a3a3 100755
--- a/plugins/tech/parquet/pom.xml
+++ b/plugins/tech/parquet/pom.xml
@@ -43,7 +43,7 @@
     </licenses>
 
     <properties>
-        <parquet.version>1.12.0</parquet.version>
+        <parquet.version>1.12.3</parquet.version>
         <hadoop.version>2.6.5</hadoop.version>
     </properties>
 
diff --git a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
index 7ade914c59..e681ded16b 100644
--- a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
+++ b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
@@ -269,6 +269,9 @@ public class ParquetOutput extends BaseTransform<ParquetOutputMeta, ParquetOutpu
     if (meta.isFilenameIncludingSplitNr()) {
       filename += "-" + new DecimalFormat("0000").format(data.split);
     }
+    if (data.isBeamContext()) {
+      filename+= "_"+log.getLogChannelId()+"_"+data.getBeamBundleNr();
+    }
     filename += "." + Const.NVL(resolve(meta.getFilenameExtension()), "parquet");
     filename += meta.getCompressionCodec().getExtension();
     return filename;
@@ -281,4 +284,23 @@ public class ParquetOutput extends BaseTransform<ParquetOutputMeta, ParquetOutpu
       throw new HopException("Error closing file " + data.filename, e);
     }
   }
+
+  @Override
+  public void batchComplete() throws HopException {
+    if (!data.isBeamContext()) {
+      closeFile();
+    }
+  }
+
+  @Override
+  public void startBundle() throws HopException {
+    if (!first) {
+      openNewFile();
+    }
+  }
+
+  @Override
+  public void finishBundle() throws HopException {
+    closeFile();
+  }
 }
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java
index d9ff813348..591ce41b63 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterFileField.java
@@ -338,7 +338,7 @@ public class ExcelWriterFileField {
     autosizecolums = false;
     streamingData = false;
     extension = "xls";
-    doNotOpenNewFileInit = false;
+    doNotOpenNewFileInit = true;
     transformNrInFilename = false;
     dateInFilename = false;
     timeInFilename = false;
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java
index a0e260a229..e0c80964f6 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransform.java
@@ -69,9 +69,9 @@ public class ExcelWriterTransform
     // get next row
     Object[] r = getRow();
 
-    // first row initialization
+    // We might have a row here, or we might get a null row if there was no input.
+    //
     if (first) {
-
       first = false;
       if (r == null) {
         data.outputRowMeta = new RowMeta();
@@ -81,10 +81,10 @@ public class ExcelWriterTransform
         data.inputRowMeta = getInputRowMeta().clone();
       }
 
-      // if we are supposed to init the file up front, here we go
+      // If we are supposed to create the file up front regardless of whether we receive input rows
+      // then this is the place to do it.
+      //
       if (!meta.getFile().isDoNotOpenNewFileInit()) {
-        data.firstFileOpened = true;
-
         try {
           prepareNextOutputFile(r);
         } catch (HopException e) {
@@ -100,9 +100,9 @@ public class ExcelWriterTransform
       }
 
       if (r != null) {
-        // if we are supposed to init the file delayed, here we go
+        // If we are supposed to create a file after receiving the first row, we do this here.
+        //
         if (meta.getFile().isDoNotOpenNewFileInit()) {
-          data.firstFileOpened = true;
           prepareNextOutputFile(r);
         }
 
@@ -166,19 +166,25 @@ public class ExcelWriterTransform
 
     if (r != null) {
       // check if the filename has changed between rows
-      if (meta.getFile().isFileNameInField()
-          && !data.currentWorkbookDefinition.getFile().equals(getFileLocation(r))) {
-        // check if the file is already used and switch or create new file
-        boolean fileFound = false;
-        for (int i = 0; i < data.usedFiles.size(); i++) {
-          if (data.usedFiles.get(i).getFile().equals(getFileLocation(r))) {
-            fileFound = true;
-            data.currentWorkbookDefinition = data.usedFiles.get(i);
-            break;
-          }
+      if (meta.getFile().isFileNameInField()) {
+        if (data.isBeamContext()) {
+          throw new HopException(
+              "Storing filenames in an input field is not supported in Beam pipelines");
         }
-        if (!fileFound) {
-          prepareNextOutputFile(r);
+
+        if (!data.currentWorkbookDefinition.getFile().equals(getFileLocation(r))) {
+          // check if the file is already used and switch or create new file
+          boolean fileFound = false;
+          for (int i = 0; i < data.usedFiles.size(); i++) {
+            if (data.usedFiles.get(i).getFile().equals(getFileLocation(r))) {
+              fileFound = true;
+              data.currentWorkbookDefinition = data.usedFiles.get(i);
+              break;
+            }
+          }
+          if (!fileFound) {
+            prepareNextOutputFile(r);
+          }
         }
       }
 
@@ -206,19 +212,19 @@ public class ExcelWriterTransform
       }
       return true;
     } else {
-      closeFilesAndDispose();
+      closeFiles();
+      setOutputDone();
       return false;
     }
   }
 
-  public void closeFilesAndDispose() throws HopException {
+  public void closeFiles() throws HopException {
 
     // Close all files and dispose objects
     for (ExcelWriterWorkbookDefinition workbookDefinition : data.usedFiles) {
       closeOutputFile(workbookDefinition);
     }
     data.usedFiles.clear();
-    setOutputDone();
   }
 
   private void createParentFolder(FileObject filename) throws Exception {
@@ -361,6 +367,7 @@ public class ExcelWriterTransform
       if (xlsRow == null) {
         xlsRow = workbookDefinition.getSheet().createRow(workbookDefinition.getPosY());
       }
+
       Object v = null;
       if (meta.getOutputFields() == null || meta.getOutputFields().isEmpty()) {
         //  Write all values in stream to text file.
@@ -679,7 +686,13 @@ public class ExcelWriterTransform
    * @return current output filename to write to
    */
   public String buildFilename(int splitNr) {
-    return meta.buildFilename(this, getCopy(), splitNr);
+    return meta.buildFilename(
+        this,
+        getCopy(),
+        splitNr,
+        data.isBeamContext(),
+        log.getLogChannelId(),
+        data.getBeamBundleNr());
   }
 
   /**
@@ -704,12 +717,23 @@ public class ExcelWriterTransform
 
   public void prepareNextOutputFile(Object[] row) throws HopException {
     try {
+      // Validation
+      //
       // sheet name shouldn't exceed 31 character
       if (data.realSheetname != null && data.realSheetname.length() > 31) {
         throw new HopException(
             BaseMessages.getString(
                 PKG, "ExcelWriterTransform.Exception.MaxSheetName", data.realSheetname));
       }
+
+      // Getting field names from input is not supported in a Beam context
+      //
+      if (data.isBeamContext() && meta.getFile().isFileNameInField()) {
+        throw new HopException(
+            BaseMessages.getString(
+                PKG, "ExcelWriterTransform.Exception.FilenameFromFieldNotSupportedInBeam"));
+      }
+
       // clear style cache
       int numOfFields =
           meta.getOutputFields() != null && meta.getOutputFields().size() > 0
@@ -904,7 +928,8 @@ public class ExcelWriterTransform
         data.startingCol = 0;
       }
 
-      // calculate starting positions
+      // Calculate the starting positions in the sheet.
+      //
       int posX;
       int posY;
       posX = data.startingCol;
@@ -964,7 +989,7 @@ public class ExcelWriterTransform
     } catch (Exception e) {
       logError("Error opening new file", e);
       setErrors(1);
-      throw new HopException(e);
+      throw new HopException("Error opening new file", e);
     }
   }
 
@@ -1031,18 +1056,29 @@ public class ExcelWriterTransform
     return false;
   }
 
-  /** pipeline run end */
   @Override
-  public void dispose() {
-    // Call one more time to make sure the files are closed in a Beam context
+  public void startBundle() throws HopException {
+    // Generate a new file for the next bundle
     //
-    try {
-      closeFilesAndDispose();
-    } catch (Exception e) {
-      throw new RuntimeException("Error closing output files in Excel writer", e);
+    if (!first) {
+      prepareNextOutputFile(null);
     }
+  }
 
-    super.dispose();
+  @Override
+  public void finishBundle() throws HopException {
+    closeFiles();
+  }
+
+  @Override
+  public void batchComplete() throws HopException {
+    // Call to make sure the files are closed in a Beam context
+    // On Beam the single threader engine works with one row at a time.
+    // We need to keep the file(s) open until the end of the bundle.
+    //
+    if (!data.isBeamContext()) {
+      closeFiles();
+    }
   }
 
   /** Write protect Sheet by setting password works only for xls output at the moment */
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java
index 85be37b479..45aa6a932e 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformData.java
@@ -28,7 +28,6 @@ public class ExcelWriterTransformData extends BaseTransformData implements ITran
   public IRowMeta outputRowMeta;
   public String realSheetname;
   public String realTemplateSheetName;
-  public boolean firstFileOpened;
   public ArrayList<ExcelWriterWorkbookDefinition> usedFiles = new ArrayList<>();
   public ExcelWriterWorkbookDefinition currentWorkbookDefinition;
   public int[] fieldnrs;
diff --git a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java
index 41d7d6ff80..c4aa5f8f3d 100644
--- a/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java
+++ b/plugins/transforms/excel/src/main/java/org/apache/hop/pipeline/transforms/excelwriter/ExcelWriterTransformMeta.java
@@ -333,7 +333,7 @@ public class ExcelWriterTransformMeta
     int i = 0;
     for (int copy = 0; copy < copies; copy++) {
       for (int split = 0; split < splits; split++) {
-        retval[i] = buildFilename(variables, copy, split);
+        retval[i] = buildFilename(variables, copy, split, false, "", 1);
         i++;
       }
     }
@@ -344,7 +344,7 @@ public class ExcelWriterTransformMeta
     return retval;
   }
 
-  public String buildFilename(IVariables variables, int transformnr, int splitnr) {
+  public String buildFilename(IVariables variables, int transformNr, int splitNr, boolean beamContext, String transformId, int bundleNr) {
     SimpleDateFormat daf = new SimpleDateFormat();
 
     // Replace possible environment variables...
@@ -370,10 +370,13 @@ public class ExcelWriterTransformMeta
       }
     }
     if (file.isTransformNrInFilename()) {
-      retval += "_" + transformnr;
+      retval += "_" + transformNr;
     }
     if (file.getSplitEvery() > 0) {
-      retval += "_" + splitnr;
+      retval += "_" + splitNr;
+    }
+    if (beamContext) {
+      retval+= "_"+transformId+"_"+bundleNr;
     }
 
     if (realextension != null && realextension.length() != 0) {
diff --git a/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties b/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties
index cf914d1100..80018b464f 100644
--- a/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties
+++ b/plugins/transforms/excel/src/main/resources/org/apache/hop/pipeline/transforms/excelwriter/messages/messages_en_US.properties
@@ -99,7 +99,7 @@ ExcelWriterDialog.StartingCell.Tooltip=Enter cell reference to start writing at,
 ExcelWriterDialog.ContentTab.TabTitle=Content
 ExcelWriterDialog.IfSheetExists.Tooltip=If the sheet exists in the file you may choose to delete it and replace it with a fresh sheet or reuse the existing sheet for writing.
 ExcelWriterDialog.CreateParentFolder.Label=Create parent folder
-ExcelWriterMeta.Injection.CreateParentFolder.Field=Create parent folder (Y/N)
+ExcelWriterMeta.Injection.CreateParentFolder.Field=Create parent folder
 ExcelWriterDialog.CreateParentFolder.Tooltip=Check this if you want to create parent folder\n when necessary. Otherwise, Apache Hop will throw an exception when parent folder doesn''t exist.
 ExcelWriterDialog.Extension.Tooltip=This is the file extension of the Excel file. It also implies the file format. 
 ExcelWriterDialog.RowWritingMethod.PushDown.Label=shift existing cells down
@@ -130,6 +130,7 @@ ExcelWriterDialog.StreamData.Label=Stream XSLX data
 ExcelWriterTransform.Exception.MaxSheetName=Sheet name exceeds 31 character: {0}
 ExcelWriterTransform.Exception.TemplateNotFound=Template Sheet: {0} not found, aborting
 ExcelWriterTransform.Exception.CouldNotPrepareFile=Could not prepare output file {0}
+ExcelWriterTransform.Exception.FilenameFromFieldNotSupportedInBeam=Getting the filename from a field is not support when running in Beam.
 ExcelWriterDialog.TemplateSheetHide.Label=Hide Template Sheet
 ExcelWriterDialog.TemplateSheetHide.Tooltip=Ensure that the template sheet will be hidden
 ExcelWriterMeta.Tab.Sheetname.Text=Sheet1
@@ -140,50 +141,50 @@ ExcelWriterDialog.FailedToGetFields.DialogMessage=We can not get fields name fro
 ExcelWriterMeta.Injection.FileName.Field=Filename
 ExcelWriterMeta.Injection.IfFileExists.Field=If output file exists (reuse/new)?
 ExcelWriterMeta.Injection.IfSheetExists.Field=If sheet exists in output file (reuse/new)?
-ExcelWriterMeta.Injection.MakeSheetActive.Field=Make this the active sheet (Y/N)?
-ExcelWriterMeta.Injection.ForceFormulaRecalculation.Field=Force formula recalculation (Y/N)?
-ExcelWriterMeta.Injection.LeaveExistingStylesUnchanged.Field=Leave styles of existing cells unchanged (Y/N)?
+ExcelWriterMeta.Injection.MakeSheetActive.Field=Make this the active sheet?
+ExcelWriterMeta.Injection.ForceFormulaRecalculation.Field=Force formula recalculation?
+ExcelWriterMeta.Injection.LeaveExistingStylesUnchanged.Field=Leave styles of existing cells unchanged?
 ExcelWriterMeta.Injection.AppendOffset.Field=Offset by ... rows
 ExcelWriterMeta.Injection.AppendEmpty.Field=Begin by writing ... empty lines
-ExcelWriterMeta.Injection.AppendOmitHeader.Field=Omit header (Y/N)?
+ExcelWriterMeta.Injection.AppendOmitHeader.Field=Omit header?
 ExcelWriterMeta.Injection.RowWritingMethod.Field=Row writing method (overwrite/pus)
 ExcelWriterMeta.Injection.StartingCell.Field=Start writing at cell
 ExcelWriterMeta.Injection.Extension.Field=Extension (xls/xlsx)
 ExcelWriterMeta.Injection.Password.Field=Password
 ExcelWriterMeta.Injection.ProtectedBy.Field=Protected by user
-ExcelWriterMeta.Injection.HeaderEnabled.Field=Write header (Y/N)?
-ExcelWriterMeta.Injection.FooterEnabled.Field=Write footer (Y/N)?
+ExcelWriterMeta.Injection.HeaderEnabled.Field=Write header?
+ExcelWriterMeta.Injection.FooterEnabled.Field=Write footer?
 ExcelWriterMeta.Injection.SplitEvery.Field=Split every ... data rows
-ExcelWriterMeta.Injection.TransformNrInFilename.Field=Include transform nr in filename (Y/N)?
-ExcelWriterMeta.Injection.DateInFilename.Field=Include date in filename (Y/N)?
-ExcelWriterMeta.Injection.AddToResultFilenames.Field=Add filenames to result (Y/N)?
-ExcelWriterMeta.Injection.ProtectSheet.Field=Protect sheet (XLS format only) (Y/N)?
-ExcelWriterMeta.Injection.TimeInFilename.Field=Include time in filename (Y/N)?
-ExcelWriterMeta.Injection.TemplateEnabled.Field=Use template when creating new files (Y/N)?
-ExcelWriterMeta.Injection.TemplateSheetEnabled.Field=Use template when creating new sheets (Y/N)?
-ExcelWriterMeta.Injection.TemplateSheetHidden.Field=Hide template sheet (Y/N)?
+ExcelWriterMeta.Injection.TransformNrInFilename.Field=Include transform split nr in filename?
+ExcelWriterMeta.Injection.DateInFilename.Field=Include date in filename?
+ExcelWriterMeta.Injection.AddToResultFilenames.Field=Add filenames to result?
+ExcelWriterMeta.Injection.ProtectSheet.Field=Protect sheet (XLS format only)?
+ExcelWriterMeta.Injection.TimeInFilename.Field=Include time in filename?
+ExcelWriterMeta.Injection.TemplateEnabled.Field=Use template when creating new files?
+ExcelWriterMeta.Injection.TemplateSheetEnabled.Field=Use template when creating new sheets?
+ExcelWriterMeta.Injection.TemplateSheetHidden.Field=Hide template sheet?
 ExcelWriterMeta.Injection.TemplateFileName.Field=Template file
 ExcelWriterMeta.Injection.TemplateSheetName.Field=Template sheet name
 ExcelWriterMeta.Injection.SheetName.Field=Sheet name
-ExcelWriterMeta.Injection.AppendLines.Field=Append lines at the end of sheet (Y/N)?
-ExcelWriterMeta.Injection.DoNotOpenNewFileInit.Field=Wait for first row before creating new file (Y/N)?
-ExcelWriterMeta.Injection.SpecifyFormat.Field=Specify date/time format (Y/N)?
+ExcelWriterMeta.Injection.AppendLines.Field=Append lines at the end of sheet?
+ExcelWriterMeta.Injection.DoNotOpenNewFileInit.Field=Wait for first row before creating new file?
+ExcelWriterMeta.Injection.SpecifyFormat.Field=Specify date/time format?
 ExcelWriterMeta.Injection.DateTimeFormat.Field=Date time format field
-ExcelWriterMeta.Injection.AutoSizeColums.Field=Auto size columns (Y/N)
-ExcelWriterMeta.Injection.StreamingData.Field=Stream XLSX data (Y/N)?
+ExcelWriterMeta.Injection.AutoSizeColums.Field=Auto size columns
+ExcelWriterMeta.Injection.StreamingData.Field=Stream XLSX data?
 ExcelWriterMeta.Injection.Fields=Fields
 ExcelWriterMeta.Injection.Field=Field
 ExcelWriterMeta.Injection.Output.FieldName.Field=Field name
 ExcelWriterMeta.Injection.Output.Type.Field=Type
 ExcelWriterMeta.Injection.Output.Format.Field=Format
 ExcelWriterMeta.Injection.Output.Title.Field=Field title
-ExcelWriterMeta.Injection.Output.FieldContainFormula.Field=Field contains formula (Y/N)?
+ExcelWriterMeta.Injection.Output.FieldContainFormula.Field=Field contains formula?
 ExcelWriterMeta.Injection.Output.Hyperlink.Field=Hyperlink
 ExcelWriterMeta.Injection.Output.Comment.Field=Cell comment (XLSX)
 ExcelWriterMeta.Injection.Output.CommentAuthor.Field=Cell comment author (XLSX)
 ExcelWriterMeta.Injection.Output.TitleStyleCell.Field=Header/footer style from cell
 ExcelWriterMeta.Injection.Output.StyleCell.Field=Style from cell
-ExcelWriterMeta.Injection.FilenameInField.Field=Filename in field (Y/N)?
+ExcelWriterMeta.Injection.FilenameInField.Field=Filename in field?
 ExcelWriterMeta.Injection.FilenameField.Field=Filename field
 ExcelWriterTransformMeta.keyword=excel,writer,transform
 ExcelWriter.Log.ParentFolderNotExistCreateIt=We can not find folder [{0}]! You need to create it before generating file [{1}].
diff --git a/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java b/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java
index 0897d09f7d..706429c699 100644
--- a/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java
+++ b/plugins/transforms/javascript/src/main/java/org/apache/hop/pipeline/transforms/javascript/ScriptValuesDummy.java
@@ -346,69 +346,54 @@ public class ScriptValuesDummy implements ITransform {
 
   @Override
   public boolean isRunning() {
-    // TODO Auto-generated method stub
     return false;
   }
 
   public boolean isUsingThreadPriorityManagment() {
-    // TODO Auto-generated method stub
     return false;
   }
 
   public void setUsingThreadPriorityManagment(boolean usingThreadPriorityManagment) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void setRunning(boolean running) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void setStopped(boolean stopped) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void setSafeStopped(boolean stopped) {
-    // TODO Auto-generated method stub
   }
 
   @Override
   public int rowsetInputSize() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public int rowsetOutputSize() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public long getProcessed() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public Map<String, ResultFile> getResultFiles() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   public long getRuntime() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public EngineComponent.ComponentExecutionStatus getStatus() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -429,31 +414,23 @@ public class ScriptValuesDummy implements ITransform {
 
   @Override
   public boolean isPaused() {
-    // TODO Auto-generated method stub
     return false;
   }
 
   @Override
   public void identifyErrorOutput() {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void setPartitioned(boolean partitioned) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void setRepartitioning(int partitioningMethod) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public boolean canProcessOneRow() {
-    // TODO Auto-generated method stub
     return false;
   }
 
@@ -463,68 +440,60 @@ public class ScriptValuesDummy implements ITransform {
   }
 
   public boolean isWaitingForData() {
-    // TODO Auto-generated method stub
     return false;
   }
 
   public void setWaitingForData(boolean waitingForData) {
-    // TODO Auto-generated method stub
   }
 
   public boolean isIdle() {
-    // TODO Auto-generated method stub
     return false;
   }
 
   public boolean isPassingData() {
-    // TODO Auto-generated method stub
     return false;
   }
 
   public void setPassingData(boolean passingData) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void batchComplete() throws HopException {
-    // TODO Auto-generated method stub
   }
 
   @Override
-  public void setMetadataProvider(IHopMetadataProvider metadataProvider) {
-    // TODO Auto-generated method stub
+  public void startBundle() throws HopException {
+  }
+
+  @Override
+  public void finishBundle() throws HopException {
+  }
 
+  @Override
+  public void setMetadataProvider(IHopMetadataProvider metadataProvider) {
   }
 
   @Override
   public IHopMetadataProvider getMetadataProvider() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public int getCurrentInputRowSetNr() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public void setCurrentOutputRowSetNr(int index) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public int getCurrentOutputRowSetNr() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public void setCurrentInputRowSetNr(int index) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
diff --git a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java
index ec8d602efb..d1f9255716 100644
--- a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java
+++ b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/BaseFileOutputMeta.java
@@ -26,17 +26,15 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 
 /** A base implementation for all output file based metas. */
-public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends JsonOutputData> extends BaseTransformMeta<Main, Data> {
+public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends JsonOutputData>
+    extends BaseTransformMeta<Main, Data> {
 
   /** Flag: add the transformnr in the filename */
-  @HopMetadataProperty(
-        injectionKeyDescription = "JsonOutput.Injection.INC_TRANSFORMNR_IN_FILENAME"
-  )
+  @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.INC_TRANSFORMNR_IN_FILENAME")
   protected boolean transformNrInFilename;
 
   /** Flag: add the partition number in the filename */
-  @HopMetadataProperty(
-          injectionKeyDescription = "JsonOutput.Injection.INC_PARTNR_IN_FILENAME")
+  @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.INC_PARTNR_IN_FILENAME")
   protected boolean partNrInFilename;
 
   /** Flag: add the date in the filename */
@@ -131,7 +129,7 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
     return partNrInFilename;
   }
 
-  public void setPartNrInFilename(boolean partNrInFilename){
+  public void setPartNrInFilename(boolean partNrInFilename) {
     this.partNrInFilename = partNrInFilename;
   }
 
@@ -139,7 +137,7 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
     return transformNrInFilename;
   }
 
-  public void setTransformNrInFilename(boolean transformNrInFilename){
+  public void setTransformNrInFilename(boolean transformNrInFilename) {
     this.transformNrInFilename = transformNrInFilename;
   }
 
@@ -202,6 +200,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
                     transform + "",
                     getPartPrefix() + part,
                     split + "",
+                    false,
+                    "",
+                    0,
                     now,
                     false,
                     showSamples);
@@ -222,6 +223,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
             "<transform>",
             "<partition>",
             "<split>",
+            false,
+            "",
+            0,
             now,
             false,
             showSamples)
@@ -238,8 +242,20 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
       final String copyNr,
       final String partitionNr,
       final String splitNr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final boolean ziparchive) {
-    return buildFilename(variables, copyNr, partitionNr, splitNr, ziparchive, true);
+    return buildFilename(
+        variables,
+        copyNr,
+        partitionNr,
+        splitNr,
+        beamContext,
+        transformId,
+        bundleNr,
+        ziparchive,
+        true);
   }
 
   public String buildFilename(
@@ -247,6 +263,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final boolean ziparchive,
       final boolean showSamples) {
 
@@ -259,6 +278,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
         transformnr,
         partnr,
         splitnr,
+        beamContext,
+        transformId,
+        bundleNr,
         new Date(),
         ziparchive,
         showSamples);
@@ -270,6 +292,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final Date date,
       final boolean ziparchive,
       final boolean showSamples) {
@@ -279,6 +304,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
         transformnr,
         partnr,
         splitnr,
+        beamContext,
+        transformId,
+        bundleNr,
         date,
         ziparchive,
         showSamples,
@@ -291,6 +319,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final Date date,
       final boolean ziparchive,
       final boolean showSamples,
@@ -302,6 +333,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
         transformnr,
         partnr,
         splitnr,
+        beamContext,
+        transformId,
+        bundleNr,
         date,
         ziparchive,
         showSamples,
@@ -315,6 +349,9 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final Date date,
       final boolean ziparchive,
       final boolean showSamples,
@@ -365,6 +402,10 @@ public abstract class BaseFileOutputMeta<Main extends JsonOutput, Data extends J
       retval += "_" + splitnr;
     }
 
+    if (beamContext) {
+      retval += "_" + transformId + "_" + bundleNr;
+    }
+
     if ("Zip".equals(meta.getFileCompression())) {
       if (ziparchive) {
         retval += ".zip";
diff --git a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java
index 662d6b87f6..df4dcc91e9 100644
--- a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java
+++ b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutput.java
@@ -113,7 +113,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
 
       if (data.nrRowsInBloc > 0 && data.nrRow % data.nrRowsInBloc == 0) {
         // We can now output an object
-        outPutRow(row);
+        outputRow(row);
       }
     }
   }
@@ -165,7 +165,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
 
       if (data.nrRowsInBloc > 0 && data.nrRow % data.nrRowsInBloc == 0) {
         // We can now output an object
-        outPutRow(row);
+        outputRow(row);
       }
     }
   }
@@ -176,11 +176,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
   public boolean processRow() throws HopException {
     Object[] r = getRow(); // This also waits for a row to be finished.
     if (r == null) {
-      // no more input to be expected...
-      if (!data.rowsAreSafe) {
-        // Let's output the remaining unsafe data
-        outPutRow(r);
-      }
+      writeJsonToFile();
 
       setOutputDone();
       return false;
@@ -220,14 +216,22 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
     return true;
   }
 
+  private void writeJsonToFile() throws HopTransformException {
+    // no more input to be expected...
+    if (!data.rowsAreSafe) {
+      // Let's output the remaining unsafe data
+      outputRow(null);
+    }
+  }
+
   @SuppressWarnings("unchecked")
-  private void outPutRow(Object[] rowData) throws HopTransformException {
+  private void outputRow(Object[] rowData) throws HopTransformException {
     // We can now output an object
     data.jg = new JSONObject();
     data.jg.put(data.realBlocName, data.ja);
     String value = data.jg.toJSONString();
 
-    if (data.outputValue && data.outputRowMeta != null) {
+    if (rowData != null && data.outputValue && data.outputRowMeta != null) {
       Object[] outputRowData = RowDataUtil.addValueData(rowData, data.inputRowMetaSize, value);
       incrementLinesOutput();
       putRow(data.outputRowMeta, outputRowData);
@@ -300,7 +304,7 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
       //
       if (data.ja.size() > 0) {
         try {
-          outPutRow(null);
+          outputRow(null);
         } catch (Exception e) {
           log.logError("Error writing final rows to disk", e);
         }
@@ -314,6 +318,25 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
     super.dispose();
   }
 
+  @Override
+  public void startBundle() throws HopException {
+    if (!first) {
+      openNewFile();
+    }
+  }
+
+  @Override
+  public void batchComplete() throws HopException {
+    if (!data.isBeamContext()) {
+      writeJsonToFile();
+    }
+  }
+
+  @Override
+  public void finishBundle() throws HopException {
+    writeJsonToFile();
+  }
+
   private void createParentFolder(String filename) throws HopTransformException {
     if (!meta.isCreateParentFolder()) {
       return;
@@ -399,7 +422,15 @@ public class JsonOutput extends BaseTransform<JsonOutputMeta, JsonOutputData> {
   }
 
   public String buildFilename() {
-    return meta.buildFilename(variables, getCopy() + "", null, data.splitnr + "", false);
+    return meta.buildFilename(
+        variables,
+        getCopy() + "",
+        null,
+        data.splitnr + "",
+        data.isBeamContext(),
+        log.getLogChannelId(),
+        data.getBeamBundleNr(),
+        false);
   }
 
   protected boolean closeFile() {
diff --git a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java
index c56252d3f8..b6b1ad7ac8 100644
--- a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java
+++ b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsonoutput/JsonOutputMeta.java
@@ -115,6 +115,7 @@ public class JsonOutputMeta extends BaseFileOutputMeta<JsonOutput, JsonOutputDat
   @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.CREATE_PARENT_FOLDER")
   private boolean createParentFolder;
 
+  @HopMetadataProperty(injectionKeyDescription = "JsonOutput.Injection.DONT_CREATE_AT_START")
   private boolean doNotOpenNewFileInit;
 
   public JsonOutputMeta() {
@@ -222,6 +223,7 @@ public class JsonOutputMeta extends BaseFileOutputMeta<JsonOutput, JsonOutputDat
     nrRowsInBloc = "1";
     operationType = OPERATION_TYPE_WRITE_TO_FILE;
     extension = "json";
+    doNotOpenNewFileInit=true;
     int nrFields = 0;
 
     for (int i = 0; i < nrFields; i++) {
diff --git a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java
index 755eae0d06..92143d99a9 100644
--- a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java
+++ b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRows.java
@@ -626,6 +626,10 @@ public class SortRows extends BaseTransform<SortRowsMeta, SortRowsData> {
     }
   }
 
+  @Override
+  public void startBundle() throws HopException {
+  }
+
   /**
    * Calling this method will alert the transform that we finished passing records to the transform.
    * Specifically for transforms like "Sort Rows" it means that the buffered rows can be sorted and
@@ -633,6 +637,15 @@ public class SortRows extends BaseTransform<SortRowsMeta, SortRowsData> {
    */
   @Override
   public void batchComplete() throws HopException {
+    if (!data.isBeamContext()) {
+      preSortBeforeFlush();
+      passBuffer();
+      setOutputDone();
+    }
+  }
+
+  @Override
+  public void finishBundle() throws HopException {
     preSortBeforeFlush();
     passBuffer();
     setOutputDone();
diff --git a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java
index b4b804d17f..eb4b8d534f 100644
--- a/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java
+++ b/plugins/transforms/sort/src/main/java/org/apache/hop/pipeline/transforms/sort/SortRowsData.java
@@ -51,7 +51,7 @@ public class SortRowsData extends BaseTransformData implements ITransformData {
   public int[] convertKeysToNative;
   public boolean convertAnyKeysToNative;
 
-  Comparator<RowTempFile> comparator;
+    Comparator<RowTempFile> comparator;
   Comparator<Object[]> rowComparator;
 
   public int freeCounter;
diff --git a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java
index 5ff88b7092..aa08142016 100644
--- a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java
+++ b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutput.java
@@ -769,6 +769,9 @@ public class TextFileOutput<Meta extends TextFileOutputMeta, Data extends TextFi
         getCopy(),
         getPartitionId(),
         data.splitnr,
+        data.isBeamContext(),
+        log.getLogChannelId(),
+        data.getBeamBundleNr(),
         ziparchive,
         meta);
   }
@@ -902,6 +905,7 @@ public class TextFileOutput<Meta extends TextFileOutputMeta, Data extends TextFi
   protected void close() throws IOException {
     if (!meta.isServletOutput()) {
       data.getFileStreamsCollection().flushOpenFiles(true);
+      data.writer = null;
     }
   }
 
@@ -1031,4 +1035,28 @@ public class TextFileOutput<Meta extends TextFileOutputMeta, Data extends TextFi
       throws HopFileException {
     return HopVfs.getOutputStream(vfsFilename, append);
   }
+
+  @Override
+  public void startBundle() throws HopException {
+  }
+
+  @Override
+  public void batchComplete() throws HopException {
+    if (!data.isBeamContext()) {
+      try {
+        close();
+      } catch (IOException e) {
+        throw new HopException("Error closing file(s)", e);
+      }
+    }
+  }
+
+  @Override
+  public void finishBundle() throws HopException {
+    try {
+      close();
+    } catch (IOException e) {
+      throw new HopException("Error closing file(s)", e);
+    }
+  }
 }
diff --git a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java
index f9d7f9fb08..3d3bb05f4d 100644
--- a/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java
+++ b/plugins/transforms/textfile/src/main/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMeta.java
@@ -227,149 +227,207 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     this.servletOutput = servletOutput;
   }
 
-  /** @param createparentfolder The createparentfolder to set. */
+  /**
+   * @param createparentfolder The createparentfolder to set.
+   */
   public void setCreateParentFolder(boolean createparentfolder) {
     this.createparentfolder = createparentfolder;
   }
 
-  /** @return Returns the createparentfolder. */
+  /**
+   * @return Returns the createparentfolder.
+   */
   public boolean isCreateParentFolder() {
     return createparentfolder;
   }
 
-  /** @param dateInFilename The dateInFilename to set. */
+  /**
+   * @param dateInFilename The dateInFilename to set.
+   */
   public void setDateInFilename(boolean dateInFilename) {
     this.dateInFilename = dateInFilename;
   }
 
-  /** @return Returns the enclosure. */
+  /**
+   * @return Returns the enclosure.
+   */
   public String getEnclosure() {
     return enclosure;
   }
 
-  /** @param enclosure The enclosure to set. */
+  /**
+   * @param enclosure The enclosure to set.
+   */
   public void setEnclosure(String enclosure) {
     this.enclosure = enclosure;
   }
 
-  /** @return Returns the enclosureForced. */
+  /**
+   * @return Returns the enclosureForced.
+   */
   public boolean isEnclosureForced() {
     return enclosureForced;
   }
 
-  /** @param enclosureForced The enclosureForced to set. */
+  /**
+   * @param enclosureForced The enclosureForced to set.
+   */
   public void setEnclosureForced(boolean enclosureForced) {
     this.enclosureForced = enclosureForced;
   }
 
-  /** @return Returns the enclosureFixDisabled. */
+  /**
+   * @return Returns the enclosureFixDisabled.
+   */
   public boolean isEnclosureFixDisabled() {
     return disableEnclosureFix;
   }
 
-  /** @param disableEnclosureFix The enclosureFixDisabled to set. */
+  /**
+   * @param disableEnclosureFix The enclosureFixDisabled to set.
+   */
   public void setEnclosureFixDisabled(boolean disableEnclosureFix) {
     this.disableEnclosureFix = disableEnclosureFix;
   }
 
-  /** @return Returns the add to result filesname. */
+  /**
+   * @return Returns the add to result filesname.
+   */
   public boolean isAddToResultFiles() {
     return addToResultFilenames;
   }
 
-  /** @param addtoresultfilenamesin The addtoresultfilenames to set. */
+  /**
+   * @param addtoresultfilenamesin The addtoresultfilenames to set.
+   */
   public void setAddToResultFiles(boolean addtoresultfilenamesin) {
     this.addToResultFilenames = addtoresultfilenamesin;
   }
 
-  /** @return Returns the fileAppended. */
+  /**
+   * @return Returns the fileAppended.
+   */
   public boolean isFileAppended() {
     return fileAppended;
   }
 
-  /** @param fileAppended The fileAppended to set. */
+  /**
+   * @param fileAppended The fileAppended to set.
+   */
   public void setFileAppended(boolean fileAppended) {
     this.fileAppended = fileAppended;
   }
 
-  /** @return Returns the fileFormat. */
+  /**
+   * @return Returns the fileFormat.
+   */
   public String getFileFormat() {
     return fileFormat;
   }
 
-  /** @param fileFormat The fileFormat to set. */
+  /**
+   * @param fileFormat The fileFormat to set.
+   */
   @Injection(name = "FORMAT")
   public void setFileFormat(String fileFormat) {
     this.fileFormat = fileFormat;
     this.newline = getNewLine(fileFormat);
   }
 
-  /** @return Returns the footer. */
+  /**
+   * @return Returns the footer.
+   */
   public boolean isFooterEnabled() {
     return footerEnabled;
   }
 
-  /** @param footer The footer to set. */
+  /**
+   * @param footer The footer to set.
+   */
   public void setFooterEnabled(boolean footer) {
     this.footerEnabled = footer;
   }
 
-  /** @return Returns the header. */
+  /**
+   * @return Returns the header.
+   */
   public boolean isHeaderEnabled() {
     return headerEnabled;
   }
 
-  /** @param header The header to set. */
+  /**
+   * @param header The header to set.
+   */
   public void setHeaderEnabled(boolean header) {
     this.headerEnabled = header;
   }
 
-  /** @return Returns the newline. */
+  /**
+   * @return Returns the newline.
+   */
   public String getNewline() {
     return newline;
   }
 
-  /** @param newline The newline to set. */
+  /**
+   * @param newline The newline to set.
+   */
   public void setNewline(String newline) {
     this.newline = newline;
   }
 
-  /** @return Returns the padded. */
+  /**
+   * @return Returns the padded.
+   */
   public boolean isPadded() {
     return padded;
   }
 
-  /** @param padded The padded to set. */
+  /**
+   * @param padded The padded to set.
+   */
   public void setPadded(boolean padded) {
     this.padded = padded;
   }
 
-  /** @return Returns the fastDump. */
+  /**
+   * @return Returns the fastDump.
+   */
   public boolean isFastDump() {
     return fastDump;
   }
 
-  /** @param fastDump The fastDump to set. */
+  /**
+   * @param fastDump The fastDump to set.
+   */
   public void setFastDump(boolean fastDump) {
     this.fastDump = fastDump;
   }
 
-  /** @return Returns the separator. */
+  /**
+   * @return Returns the separator.
+   */
   public String getSeparator() {
     return separator;
   }
 
-  /** @param separator The separator to set. */
+  /**
+   * @param separator The separator to set.
+   */
   public void setSeparator(String separator) {
     this.separator = separator;
   }
 
-  /** @return Returns the "do not open new file at init" flag. */
+  /**
+   * @return Returns the "do not open new file at init" flag.
+   */
   public boolean isDoNotOpenNewFileInit() {
     return doNotOpenNewFileInit;
   }
 
-  /** @param doNotOpenNewFileInit The "do not open new file at init" flag to set. */
+  /**
+   * @param doNotOpenNewFileInit The "do not open new file at init" flag to set.
+   */
   public void setDoNotOpenNewFileInit(boolean doNotOpenNewFileInit) {
     this.doNotOpenNewFileInit = doNotOpenNewFileInit;
   }
@@ -378,7 +436,7 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
    * @return Returns the splitEvery.
    * @deprecated use {@link #getSplitEvery(IVariables)} or {@link #getSplitEveryRows()}
    */
-  @Deprecated(since="2.0")
+  @Deprecated(since = "2.0")
   public int getSplitEvery() {
     return Const.toInt(splitEveryRows, 0);
   }
@@ -391,17 +449,23 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     return Const.toInt(varSpace == null ? splitEveryRows : varSpace.resolve(splitEveryRows), 0);
   }
 
-  /** @return At how many rows to split into a new file. */
+  /**
+   * @return At how many rows to split into a new file.
+   */
   public String getSplitEveryRows() {
     return splitEveryRows;
   }
 
-  /** @param value At how many rows to split into a new file. */
+  /**
+   * @param value At how many rows to split into a new file.
+   */
   public void setSplitEveryRows(String value) {
     splitEveryRows = value;
   }
 
-  /** @return <tt>1</tt> if <tt>isFooterEnabled()</tt> and <tt>0</tt> otherwise */
+  /**
+   * @return <tt>1</tt> if <tt>isFooterEnabled()</tt> and <tt>0</tt> otherwise
+   */
   public int getFooterShift() {
     return isFooterEnabled() ? 1 : 0;
   }
@@ -410,32 +474,42 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
    * @param splitEvery The splitEvery to set.
    * @deprecated use {@link #setSplitEveryRows(String)}
    */
-  @Deprecated(since="2.0")
+  @Deprecated(since = "2.0")
   public void setSplitEvery(int splitEvery) {
     splitEveryRows = Integer.toString(splitEvery);
   }
 
-  /** @param transformNrInFilename The transformNrInFilename to set. */
+  /**
+   * @param transformNrInFilename The transformNrInFilename to set.
+   */
   public void setTransformNrInFilename(boolean transformNrInFilename) {
     this.transformNrInFilename = transformNrInFilename;
   }
 
-  /** @param partNrInFilename The partNrInFilename to set. */
+  /**
+   * @param partNrInFilename The partNrInFilename to set.
+   */
   public void setPartNrInFilename(boolean partNrInFilename) {
     this.partNrInFilename = partNrInFilename;
   }
 
-  /** @param timeInFilename The timeInFilename to set. */
+  /**
+   * @param timeInFilename The timeInFilename to set.
+   */
   public void setTimeInFilename(boolean timeInFilename) {
     this.timeInFilename = timeInFilename;
   }
 
-  /** @return Returns the outputFields. */
+  /**
+   * @return Returns the outputFields.
+   */
   public TextFileField[] getOutputFields() {
     return outputFields;
   }
 
-  /** @param outputFields The outputFields to set. */
+  /**
+   * @param outputFields The outputFields to set.
+   */
   public void setOutputFields(TextFileField[] outputFields) {
     this.outputFields = outputFields;
   }
@@ -456,7 +530,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     this.encoding = encoding;
   }
 
-  /** @return The desired last line in the output file, null or empty if nothing has to be added. */
+  /**
+   * @return The desired last line in the output file, null or empty if nothing has to be added.
+   */
   public String getEndedLine() {
     return endedLine;
   }
@@ -469,22 +545,30 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     this.endedLine = endedLine;
   }
 
-  /** @return Is the file name coded in a field? */
+  /**
+   * @return Is the file name coded in a field?
+   */
   public boolean isFileNameInField() {
     return fileNameInField;
   }
 
-  /** @param fileNameInField Is the file name coded in a field? */
+  /**
+   * @param fileNameInField Is the file name coded in a field?
+   */
   public void setFileNameInField(boolean fileNameInField) {
     this.fileNameInField = fileNameInField;
   }
 
-  /** @return The field name that contains the output file name. */
+  /**
+   * @return The field name that contains the output file name.
+   */
   public String getFileNameField() {
     return fileNameField;
   }
 
-  /** @param fileNameField Name of the field that contains the file name */
+  /**
+   * @param fileNameField Name of the field that contains the file name
+   */
   public void setFileNameField(String fileNameField) {
     this.fileNameField = fileNameField;
   }
@@ -665,7 +749,7 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     setFileCompression(fileCompressionTypeCodes[FILE_COMPRESSION_TYPE_NONE]);
     fileName = "file";
     servletOutput = false;
-    doNotOpenNewFileInit = false;
+    doNotOpenNewFileInit = true;
     extension = "txt";
     transformNrInFilename = false;
     partNrInFilename = false;
@@ -705,6 +789,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
       int transformnr,
       String partnr,
       int splitnr,
+      boolean beamContext,
+      String transformId,
+      int bundleNr,
       boolean ziparchive,
       TextFileOutputMeta meta) {
 
@@ -717,6 +804,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
         Integer.toString(transformnr),
         partnr,
         Integer.toString(splitnr),
+        beamContext,
+        transformId,
+        bundleNr,
         new Date(),
         ziparchive,
         true,
@@ -1084,6 +1174,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
                     transform + "",
                     getPartPrefix() + part,
                     split + "",
+                    false,
+                    "",
+                    1,
                     now,
                     false,
                     showSamples);
@@ -1104,6 +1197,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
             "<transform>",
             "<partition>",
             "<split>",
+            false,
+            "",
+            1,
             now,
             false,
             showSamples)
@@ -1115,43 +1211,15 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     return "";
   }
 
-  public String buildFilename(
-      final IVariables variables,
-      final String transformnr,
-      final String partnr,
-      final String splitnr,
-      final boolean ziparchive) {
-    return buildFilename(variables, transformnr, partnr, splitnr, ziparchive, true);
-  }
-
-  public String buildFilename(
-      final IVariables variables,
-      final String transformnr,
-      final String partnr,
-      final String splitnr,
-      final boolean ziparchive,
-      final boolean showSamples) {
-
-    String realFileName = variables.resolve(fileName);
-    String realExtension = variables.resolve(extension);
-
-    return buildFilename(
-        realFileName,
-        realExtension,
-        transformnr,
-        partnr,
-        splitnr,
-        new Date(),
-        ziparchive,
-        showSamples);
-  }
-
   private String buildFilename(
       final String realFileName,
       final String realExtension,
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final Date date,
       final boolean ziparchive,
       final boolean showSamples) {
@@ -1161,6 +1229,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
         transformnr,
         partnr,
         splitnr,
+        beamContext,
+        transformId,
+        bundleNr,
         date,
         ziparchive,
         showSamples,
@@ -1173,6 +1244,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final Date date,
       final boolean ziparchive,
       final boolean showSamples,
@@ -1184,6 +1258,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
         transformnr,
         partnr,
         splitnr,
+        beamContext,
+        transformId,
+        bundleNr,
         date,
         ziparchive,
         showSamples,
@@ -1197,6 +1274,9 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
       final String transformnr,
       final String partnr,
       final String splitnr,
+      final boolean beamContext,
+      final String transformId,
+      final int bundleNr,
       final Date date,
       final boolean ziparchive,
       final boolean showSamples,
@@ -1246,6 +1326,11 @@ public class TextFileOutputMeta extends BaseTransformMeta<TextFileOutput, TextFi
     if (meta.getSplitEvery(variables) > 0) {
       retval += "_" + splitnr;
     }
+    // In a Beam context, always add the transform ID and bundle number
+    //
+    if (beamContext) {
+      retval += "_" + transformId + "_" + bundleNr;
+    }
 
     if ("Zip".equals(meta.getFileCompression())) {
       if (ziparchive) {
diff --git a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java
index b07b4daa0f..92f35c9b3e 100644
--- a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java
+++ b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputMetaTest.java
@@ -176,11 +176,11 @@ public class TextFileOutputMetaTest {
     meta.setSplitEveryRows("${splitVar}");
     IVariables varSpace = new Variables();
     assertEquals(0, meta.getSplitEvery(varSpace));
-    String fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 3, false, meta);
+    String fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 3, false, null, 0, false, meta);
     assertEquals("foo.txt2", fileName);
     varSpace.setVariable("splitVar", "2");
     assertEquals(2, meta.getSplitEvery(varSpace));
-    fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 5, false, meta);
+    fileName = meta.buildFilename("foo", "txt2", varSpace, 0, null, 5, false, null, 0, false, meta);
     assertEquals("foo_5.txt2", fileName);
   }
 
diff --git a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java
index f58f0f8e71..c51d84ad7a 100644
--- a/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java
+++ b/plugins/transforms/textfile/src/test/java/org/apache/hop/pipeline/transforms/textfileoutput/TextFileOutputTest.java
@@ -371,6 +371,9 @@ public class TextFileOutputTest {
                 Mockito.anyString(),
                 Mockito.anyInt(),
                 Mockito.anyBoolean(),
+                Mockito.anyString(),
+                Mockito.anyInt(),
+                Mockito.anyBoolean(),
                 Mockito.any(TextFileOutputMeta.class)))
         .thenReturn(TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION);
 
@@ -461,6 +464,9 @@ public class TextFileOutputTest {
                 Mockito.anyInt(),
                 Mockito.anyString(),
                 Mockito.anyInt(),
+                    Mockito.anyBoolean(),
+                    Mockito.anyString(),
+                    Mockito.anyInt(),
                 Mockito.anyBoolean(),
                 Mockito.any(TextFileOutputMeta.class)))
         .thenReturn(pathToFile);
@@ -642,6 +648,9 @@ public class TextFileOutputTest {
                 0,
                 null,
                 0,
+                false,
+                null,
+                0,
                 true,
                 transformMockHelper.iTransformMeta))
         .thenReturn(TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION);
@@ -719,6 +728,9 @@ public class TextFileOutputTest {
                 0,
                 null,
                 0,
+                false,
+                null,
+                0,
                 true,
                 transformMockHelper.iTransformMeta))
         .thenReturn(TEXT_FILE_OUTPUT_PREFIX + TEXT_FILE_OUTPUT_EXTENSION);