You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/07 03:28:10 UTC
[10/18] samza-hello-samza git commit: SAMZA-1963: Verify table
samples in samza-hello-samples
SAMZA-1963: Verify table samples in samza-hello-samples
Author: Wei Song <ws...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #40 from weisong44/SAMZA-1963 and squashes the following commits:
473608b [Wei Song] Verified and fixed minor issues for local and remote table samples
ff2c171 [Wei Song] Merged from upstream/latest
43e4123 [Wei Song] Updated based on review comments
d2eb472 [Wei Song] Updated based on review comments
3274501 [Wei Song] Added sample for remote table
bc0d047 [Wei Song] Merge remote-tracking branch 'upstream/latest' into latest
ecde9a6 [Wei Song] Added stream-table join sample to cookbook
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/4a73e0d9
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/4a73e0d9
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/4a73e0d9
Branch: refs/heads/master
Commit: 4a73e0d96bd9a604452486d31d44ba35001b18ef
Parents: 4ec614c
Author: Wei Song <ws...@linkedin.com>
Authored: Fri Oct 19 12:31:14 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Fri Oct 19 12:31:14 2018 -0700
----------------------------------------------------------------------
src/main/config/remote-table-join-example.properties | 2 +-
.../java/samza/examples/cookbook/RemoteTableJoinExample.java | 1 +
.../java/samza/examples/cookbook/StreamTableJoinExample.java | 8 ++++----
3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4a73e0d9/src/main/config/remote-table-join-example.properties
----------------------------------------------------------------------
diff --git a/src/main/config/remote-table-join-example.properties b/src/main/config/remote-table-join-example.properties
index 0addbc8..9680471 100644
--- a/src/main/config/remote-table-join-example.properties
+++ b/src/main/config/remote-table-join-example.properties
@@ -19,7 +19,7 @@
app.class=samza.examples.cookbook.RemoteTableJoinExample
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=stock-price-table-joiner
-job.container.count=1
+job.container.count=2
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4a73e0d9/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
index 1c27cda..2418ae6 100644
--- a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
+++ b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
@@ -123,6 +123,7 @@ public class RemoteTableJoinExample implements StreamApplication {
kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde());
KafkaOutputDescriptor<StockPrice> stockPriceOutputDescriptor =
kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new JsonSerdeV2<>(StockPrice.class));
+ appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
MessageStream<String> stockSymbolStream = appDescriptor.getInputStream(stockSymbolInputDescriptor);
OutputStream<StockPrice> stockPriceStream = appDescriptor.getOutputStream(stockPriceOutputDescriptor);
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4a73e0d9/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
index d9f6acf..4fb6254 100644
--- a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
+++ b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
@@ -62,6 +62,10 @@ import java.util.Map;
* ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/stream-table-join-example.properties
* </li>
* <li>
+ * Consume messages from the "enriched-pageview-join-output" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic enriched-pageview-join-output
+ * </li>
+ * <li>
* Produce some messages to the "profile-table-input" topic with the same userId <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic profile-table-input --broker-list localhost:9092 <br/>
* {"userId": "user1", "company": "LNKD"} <br/>
@@ -73,10 +77,6 @@ import java.util.Map;
* {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
* {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
* </li>
- * <li>
- * Consume messages from the "enriched-pageview-join-output" topic <br/>
- * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic enriched-pageview-join-output
- * </li>
* </ol>
*
*/