You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/11/02 15:07:23 UTC
[hop] branch master updated: HOP-4444: Align Beam join output with Hop output
This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new 16b14a65c8 HOP-4444: Align Beam join output with Hop output
new 969dafd737 Merge pull request #1772 from hansva/master
16b14a65c8 is described below
commit 16b14a65c88539ace111ef8800f1078b1d239733
Author: Hans Van Akelyen <ha...@gmail.com>
AuthorDate: Wed Nov 2 14:35:09 2022 +0100
HOP-4444: Align Beam join output with Hop output
---
.../hop/beam/core/fn/MergeJoinAssemblerFn.java | 47 +---------------------
.../handler/BeamMergeJoinTransformHandler.java | 12 ++----
2 files changed, 6 insertions(+), 53 deletions(-)
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java
index df7b7d1a76..9ec485c30e 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/MergeJoinAssemblerFn.java
@@ -155,7 +155,6 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
KV<HopRow, KV<HopRow, HopRow>> element = processContext.element();
KV<HopRow, HopRow> value = element.getValue();
- HopRow key = element.getKey();
HopRow leftValue = value.getKey();
HopRow rightValue = value.getValue();
@@ -169,7 +168,7 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
if (leftValue.isNotEmpty()) {
Integer keyIndex = leftKeyIndexes.get(i);
if (keyIndex != null) {
- outputRow[i] = key.getRow()[keyIndex];
+ outputRow[i] = leftValue.getRow()[keyIndex];
}
Integer valueIndex = leftValueIndexes.get(i);
if (valueIndex != null) {
@@ -184,7 +183,7 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
if (rightValue.isNotEmpty()) {
Integer keyIndex = rightKeyIndexes.get(i);
if (keyIndex != null) {
- outputRow[leftRowMeta.size()+i] = key.getRow()[keyIndex];
+ outputRow[leftRowMeta.size()+i] = rightValue.getRow()[keyIndex];
}
Integer valueIndex = rightValueIndexes.get(i);
if (valueIndex != null) {
@@ -193,48 +192,6 @@ public class MergeJoinAssemblerFn extends DoFn<KV<HopRow, KV<HopRow, HopRow>>, H
}
}
- /*// Hop style, first the left values
- //
- if (leftValue.allNull()) {
- index += leftVRowMeta.size();
- } else {
- for (int i = 0; i < leftVRowMeta.size(); i++) {
- outputRow[index++] = leftValue.getRow()[i];
- }
- }
-
- // Now the left key
- //
- if (leftValue.allNull()) {
- index += leftKRowMeta.size();
- } else {
- for (int i = 0; i < leftKRowMeta.size(); i++) {
- outputRow[index++] = key.getRow()[i];
- }
- }
-
- // Then the right key
- //
- if (rightValue.allNull()) {
- // No right key given if the value is null
- //
- index += leftKRowMeta.size();
- } else {
- for (int i = 0; i < leftKRowMeta.size(); i++) {
- outputRow[index++] = key.getRow()[i];
- }
- }
-
- // Finally the right values
- //
- if (rightValue.allNull()) {
- index += rightVRowMeta.size();
- } else {
- for (int i = 0; i < rightVRowMeta.size(); i++) {
- outputRow[index++] = rightValue.getRow()[i];
- }
- }*/
-
processContext.output(new HopRow(outputRow));
writtenCounter.inc();
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java
index 2469397dc3..f2f8714004 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java
@@ -129,10 +129,8 @@ public class BeamMergeJoinTransformHandler extends BeamBaseTransformHandler
}
for (IValueMeta valueMeta : leftRowMeta.getValueMetaList()) {
String valueName = valueMeta.getName();
- if (Const.indexOfString(valueName, leftKeys) < 0) {
- leftV.add(valueName);
- leftVRowMeta.addValueMeta(valueMeta.clone());
- }
+ leftV.add(valueName);
+ leftVRowMeta.addValueMeta(valueMeta.clone());
}
HopKeyValueFn leftKVFn =
@@ -156,10 +154,8 @@ public class BeamMergeJoinTransformHandler extends BeamBaseTransformHandler
}
for (IValueMeta valueMeta : rightRowMeta.getValueMetaList()) {
String valueName = valueMeta.getName();
- if (Const.indexOfString(valueName, rightKeys) < 0) {
- rightV.add(valueName);
- rightVRowMeta.addValueMeta(valueMeta.clone());
- }
+ rightV.add(valueName);
+ rightVRowMeta.addValueMeta(valueMeta.clone());
}
HopKeyValueFn rightKVFn =