You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/23 02:05:03 UTC
samza git commit: Changes to Standalone integration tests
Repository: samza
Updated Branches:
refs/heads/master 00cd44ac9 -> 738410392
Changes to Standalone integration tests
* Removing task.inputs since its not required in the new Samza 1.0 API
* Fixing input-output sys bug
* Fixing jobModel zk path bug -- camel case.
Author: rmatharu@linkedin.com <rm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #756 from rmatharu/standalonetests
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/73841039
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/73841039
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/73841039
Branch: refs/heads/master
Commit: 738410392036746fc51b7360a97d4b1f29ea48dc
Parents: 00cd44a
Author: rmatharu@linkedin.com <rm...@linkedin.com>
Authored: Mon Oct 22 19:04:58 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 22 19:04:58 2018 -0700
----------------------------------------------------------------------
.../src/main/config/standalone.failure.test.properties | 1 -
.../integration/TestStandaloneIntegrationApplication.java | 6 +++---
samza-test/src/main/python/tests/zk_client.py | 8 ++++----
3 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/73841039/samza-test/src/main/config/standalone.failure.test.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/standalone.failure.test.properties b/samza-test/src/main/config/standalone.failure.test.properties
index d855d5f..d200251 100644
--- a/samza-test/src/main/config/standalone.failure.test.properties
+++ b/samza-test/src/main/config/standalone.failure.test.properties
@@ -26,7 +26,6 @@ job.name=test-app-name
job.id=test-app-id
## Kafka I/O system properties.
-task.inputs=standalone_integration_test_kafka_input_topic
input.stream.name=standalone_integration_test_kafka_input_topic
job.default.system=testSystemName
systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
http://git-wip-us.apache.org/repos/asf/samza/blob/73841039/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index 2002ce6..fba3b52 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -40,15 +40,15 @@ public class TestStandaloneIntegrationApplication implements StreamApplication {
public void describe(StreamApplicationDescriptor appDescriptor) {
String systemName = "testSystemName";
String inputStreamName = appDescriptor.getConfig().get("input.stream.name");
- String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
- LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName);
+ String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
+ LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName);
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName);
KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
KafkaInputDescriptor<KV<Object, Object>> isd =
kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
KafkaOutputDescriptor<KV<Object, Object>> osd =
- kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde);
+ kafkaSystemDescriptor.getOutputDescriptor(outputStreamName, noOpSerde);
appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/73841039/samza-test/src/main/python/tests/zk_client.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/zk_client.py b/samza-test/src/main/python/tests/zk_client.py
index 2a11a80..006d84f 100644
--- a/samza-test/src/main/python/tests/zk_client.py
+++ b/samza-test/src/main/python/tests/zk_client.py
@@ -47,8 +47,8 @@ class ZkClient:
self.kazoo_client.stop()
def watch_job_model(self, watch_function):
- self.kazoo_client.ensure_path('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
- self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function)
+ self.kazoo_client.ensure_path('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node))
+ self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function)
def get_latest_job_model(self):
"""
@@ -56,12 +56,12 @@ class ZkClient:
"""
job_model_dict = {}
try:
- childZkNodes = self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
+ childZkNodes = self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node))
if len(childZkNodes) > 0:
childZkNodes.sort()
childZkNodes.reverse()
- job_model_generation_path = '{0}/JobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0])
+ job_model_generation_path = '{0}/jobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0])
job_model, _ = self.kazoo_client.get(job_model_generation_path)
"""