You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/11/02 15:07:23 UTC

[hop] branch master updated: HOP-4444: Align Beam join output with Hop output

This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/master by this push:
     new 16b14a65c8 HOP-4444: Align Beam join output with Hop output
     new 969dafd737 Merge pull request #1772 from hansva/master
16b14a65c8 is described below

commit 16b14a65c88539ace111ef8800f1078b1d239733
Author: Hans Van Akelyen <ha...@gmail.com>
AuthorDate: Wed Nov 2 14:35:09 2022 +0100

    HOP-4444: Align Beam join output with Hop output
---
 .../hop/beam/core/fn/MergeJoinAssemblerFn.java     | 47 +---------------------
 .../handler/BeamMergeJoinTransformHandler.java     | 12 ++----
 2 files changed, 6 insertions(+), 53 deletions(-)

diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java
index df7b7d1a76..9ec485c30e 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java
@@ -155,7 +155,6 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
       KV<HopRow, KV<HopRow, HopRow>> element = processContext.element();
       KV<HopRow, HopRow> value = element.getValue();
 
-      HopRow key = element.getKey();
       HopRow leftValue = value.getKey();
       HopRow rightValue = value.getValue();
 
@@ -169,7 +168,7 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
         if (leftValue.isNotEmpty()) {
           Integer keyIndex = leftKeyIndexes.get(i);
           if (keyIndex != null) {
-            outputRow[i] = key.getRow()[keyIndex];
+            outputRow[i] = leftValue.getRow()[keyIndex];
           }
           Integer valueIndex = leftValueIndexes.get(i);
           if (valueIndex != null) {
@@ -184,7 +183,7 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
         if (rightValue.isNotEmpty()) {
           Integer keyIndex = rightKeyIndexes.get(i);
           if (keyIndex != null) {
-            outputRow[leftRowMeta.size()+i] = key.getRow()[keyIndex];
+            outputRow[leftRowMeta.size()+i] = rightValue.getRow()[keyIndex];
           }
           Integer valueIndex = rightValueIndexes.get(i);
           if (valueIndex != null) {
@@ -193,48 +192,6 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
         }
       }
 
-      /*// Hop style, first the left values
-      //
-      if (leftValue.allNull()) {
-        index += leftVRowMeta.size();
-      } else {
-        for (int i = 0; i < leftVRowMeta.size(); i++) {
-          outputRow[index++] = leftValue.getRow()[i];
-        }
-      }
-
-      // Now the left key
-      //
-      if (leftValue.allNull()) {
-        index += leftKRowMeta.size();
-      } else {
-        for (int i = 0; i < leftKRowMeta.size(); i++) {
-          outputRow[index++] = key.getRow()[i];
-        }
-      }
-
-      // Then the right key
-      //
-      if (rightValue.allNull()) {
-        // No right key given if the value is null
-        //
-        index += leftKRowMeta.size();
-      } else {
-        for (int i = 0; i < leftKRowMeta.size(); i++) {
-          outputRow[index++] = key.getRow()[i];
-        }
-      }
-
-      // Finally the right values
-      //
-      if (rightValue.allNull()) {
-        index += rightVRowMeta.size();
-      } else {
-        for (int i = 0; i < rightVRowMeta.size(); i++) {
-          outputRow[index++] = rightValue.getRow()[i];
-        }
-      }*/
-
       processContext.output(new HopRow(outputRow));
       writtenCounter.inc();
 
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java
index 2469397dc3..f2f8714004 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java
@@ -129,10 +129,8 @@ public class BeamMergeJoinTransformHandler extends BeamBaseTransformHandler
     }
     for (IValueMeta valueMeta : leftRowMeta.getValueMetaList()) {
       String valueName = valueMeta.getName();
-      if (Const.indexOfString(valueName, leftKeys) < 0) {
-        leftV.add(valueName);
-        leftVRowMeta.addValueMeta(valueMeta.clone());
-      }
+      leftV.add(valueName);
+      leftVRowMeta.addValueMeta(valueMeta.clone());
     }
 
     HopKeyValueFn leftKVFn =
@@ -156,10 +154,8 @@ public class BeamMergeJoinTransformHandler extends BeamBaseTransformHandler
     }
     for (IValueMeta valueMeta : rightRowMeta.getValueMetaList()) {
       String valueName = valueMeta.getName();
-      if (Const.indexOfString(valueName, rightKeys) < 0) {
-        rightV.add(valueName);
-        rightVRowMeta.addValueMeta(valueMeta.clone());
-      }
+      rightV.add(valueName);
+      rightVRowMeta.addValueMeta(valueMeta.clone());
     }
 
     HopKeyValueFn rightKVFn =