You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:49 UTC
[09/12] samza git commit: Consolidating package names for System,
Stream, Application and Table descriptors.
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
deleted file mode 100644
index e59b4d0..0000000
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for an Event Hubs output stream
- * <p>
- * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
- * </p>
- * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
- * in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream
- */
-public class EventHubsOutputDescriptor<StreamMessageType>
- extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
- private String namespace;
- private String entityPath;
- private Optional<String> sasKeyName = Optional.empty();
- private Optional<String> sasToken = Optional.empty();
-
- /**
- * Constructs an {@link OutputDescriptor} instance.
- *
- * @param streamId id of the stream
- * @param namespace namespace for the Event Hubs entity to produce to, not null
- * @param entityPath entity path for the Event Hubs entity to produce to, not null
- * @param serde serde for messages in the stream
- * @param systemDescriptor system descriptor this stream descriptor was obtained from
- */
- EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
- SystemDescriptor systemDescriptor) {
- super(streamId, serde, systemDescriptor);
- this.namespace = StringUtils.stripToNull(namespace);
- this.entityPath = StringUtils.stripToNull(entityPath);
- if (this.namespace == null || this.entityPath == null) {
- throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
- + "system: {%s}, stream: {%s}", getSystemName(), streamId));
- }
- }
-
- /**
- * SAS Key name of the associated output stream. Required to access the output Event Hubs entity per stream.
- *
- * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
- * @return this output descriptor
- */
- public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
- this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
- return this;
- }
-
- /**
- * SAS Token of the associated output stream. Required to access the output Event Hubs per stream.
- *
- * @param sasToken the SAS token required to access to Event Hubs entity
- * @return this output descriptor
- */
- public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
- this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
- return this;
- }
-
- @Override
- public Map<String, String> toConfig() {
- HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
-
- String streamId = getStreamId();
-
- ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
- ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
- sasKeyName.ifPresent(keyName ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
- sasToken.ifPresent(key ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
- return ehConfigs;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
deleted file mode 100644
index 189340f..0000000
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
-
-
-/**
- * A descriptor for a Event Hubs system.
- * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- */
-public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
- private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
-
- private List<String> streamIds = new ArrayList<>();
- private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
- private Optional<Integer> numClientThreads = Optional.empty();
- private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
- private Optional<Integer> consumerMaxEventCountPerPoll = Optional.empty();
- private Optional<Integer> consumerPrefetchCount = Optional.empty();
- private Optional<Boolean> producerEventhubsSendKey = Optional.empty();
- private Optional<PartitioningMethod> producerEventhubsPartitioningMethod = Optional.empty();
-
- /**
- * Constructs a {@link SystemDescriptor} instance.
- * @param systemName name of this system
- */
- public EventHubsSystemDescriptor(String systemName) {
- super(systemName, FACTORY_CLASS_NAME, null, null);
- }
-
- /**
- * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
- * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
- * <p>
- * The type of messages in the stream is the type of the provided stream level serde.
- *
- * @param streamId id of the input stream
- * @param namespace namespace of the Event Hubs entity to consume from
- * @param entityPath entity path of the Event Hubs entity to consume from
- * @param serde stream level serde for the input stream
- * @param <StreamMessageType> type of messages in this stream
- * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
- */
- public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
- String entityPath, Serde<StreamMessageType> serde) {
- streamIds.add(streamId);
- return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
- }
-
- /**
- * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
- * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
- * <p>
- * The type of messages in the stream is the type of the provided stream level serde.
- *
- * @param streamId id of the output stream
- * @param namespace namespace of the Event Hubs entity to produce to
- * @param entityPath entity path of the Event Hubs entity to produce to
- * @param serde stream level serde for the output stream
- * @param <StreamMessageType> type of the messages in this stream
- * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
- */
- public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
- String entityPath, Serde<StreamMessageType> serde) {
- streamIds.add(streamId);
- return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
- }
-
- /**
- * Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.
- *
- * @param timeoutMS the timeout in ms for getting runtime information from the Event Hubs system
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withRuntimeInfoTimeout(int timeoutMS) {
- this.fetchRuntimeInfoTimeout = Optional.of(timeoutMS);
- return this;
- }
-
- /**
- * Number of threads in thread pool that will be used by the EventHubClient.
- *
- * @param numClientThreads the number of threads
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withNumClientThreads(int numClientThreads) {
- this.numClientThreads = Optional.of(numClientThreads);
- return this;
- }
-
- /**
- * Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages.
- * Larger buffer capacity typically leads to better throughput but consumes more memory.
- *
- * @param receiveQueueSize the number of messages from Event Hubs that should be buffered in the
- * {@link org.apache.samza.util.BlockingEnvelopeMap}
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withReceiveQueueSize(int receiveQueueSize) {
- this.consumerReceiveQueueSize = Optional.of(receiveQueueSize);
- return this;
- }
-
- /**
- * Maximum number of events that Event Hubs client can return in a receive call.
- *
- * @param count the number of max events per poll
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withMaxEventCountPerPoll(int count) {
- this.consumerMaxEventCountPerPoll = Optional.of(count);
- return this;
- }
-
- /**
- * Number of events that Event Hubs client should prefetch from the server.
- *
- * @param count the number of events that should be prefetched.
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withPrefetchCount(int count) {
- this.consumerPrefetchCount = Optional.of(count);
- return this;
- }
-
-
- /**
- * Configure the method that the message is partitioned for the downstream Event Hubs in one of the following ways:
- * <ul>
- * <li>ROUND_ROBIN:
- * The message key and partition key are ignored and the message
- * will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</li>
- * <li>EVENT_HUB_HASHING:
- * Employs the hashing mechanism in Event Hubs to determine, based on the key of the message,
- * which partition the message should go. Using this method still ensures that all the events with
- * the same key are sent to the same partition in the event hub. If this option is chosen, the partition
- * key used for the hash should be a string. If the partition key is not set, the message key is
- * used instead.</li>
- * <li>PARTITION_KEY_AS_PARTITION:
- * Use the integer key specified by the partition key or key of the message to a specific partition
- * on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity,
- * a modulo operation will be performed to determine the resulting paritition.
- * ie. if there are 6 partitions and the key is 9, the message will end up in partition 3.
- * Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</li>
- * </ul>
- * @param partitioningMethod the desired partitioning method for the message in the downstream Event Hubs entity
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withPartitioningMethod(PartitioningMethod partitioningMethod) {
- this.producerEventhubsPartitioningMethod = Optional.ofNullable(partitioningMethod);
- return this;
- }
-
- /**
- * If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData
- * message for Event Hubs. The Samza message key will not be sent otherwise.
- * Note: If the Samza Event Hubs consumer is used, this field is the partition key of the received EventData, or the
- * message key if the partition key is not present.
- *
- * @param sendKeys set to true if the message key should be sent in the EventData properties, the key is not sent otherwise
- * @return this system descriptor
- */
- public EventHubsSystemDescriptor withSendKeys(boolean sendKeys) {
- this.producerEventhubsSendKey = Optional.of(sendKeys);
- return this;
- }
-
- @Override
- public Map<String, String> toConfig() {
- Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
- String systemName = getSystemName();
-
- if (!this.streamIds.isEmpty()) {
- ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), String.join(",", this.streamIds));
- }
- this.fetchRuntimeInfoTimeout.ifPresent(timeout ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName), Integer.toString(timeout)));
- this.numClientThreads.ifPresent(numClientThreads ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), Integer.toString(numClientThreads)));
- this.consumerReceiveQueueSize.ifPresent(receiveQueueSize ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName), Integer.toString(receiveQueueSize)));
- this.consumerMaxEventCountPerPoll.ifPresent(count ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), Integer.toString(count)));
- this.consumerPrefetchCount.ifPresent(count ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName), Integer.toString(count)));
- this.producerEventhubsSendKey.ifPresent(sendKeys ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName), Boolean.toString(sendKeys)));
- this.producerEventhubsPartitioningMethod.ifPresent(partitioningMethod ->
- ehConfigs.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), partitioningMethod.toString()));
- return ehConfigs;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
new file mode 100644
index 0000000..cce716c
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+
+
+/**
+ * A descriptor for the Event Hubs output stream
+ *<p>
+ * An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ *</p>
+ * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
+ * in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class EventHubsInputDescriptor<StreamMessageType>
+ extends InputDescriptor<StreamMessageType, EventHubsInputDescriptor<StreamMessageType>> {
+ private String namespace;
+ private String entityPath;
+ private Optional<String> sasKeyName = Optional.empty();
+ private Optional<String> sasToken = Optional.empty();
+ private Optional<String> consumerGroup = Optional.empty();
+
+ /**
+ * Constructs an {@link InputDescriptor} instance.
+ *
+ * @param streamId id of the stream
+ * @param namespace namespace for the Event Hubs entity to consume from, not null
+ * @param entityPath entity path for the Event Hubs entity to consume from, not null
+ * @param serde serde for messages in the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+ SystemDescriptor systemDescriptor) {
+ super(streamId, serde, systemDescriptor, null);
+ this.namespace = StringUtils.stripToNull(namespace);
+ this.entityPath = StringUtils.stripToNull(entityPath);
+ if (this.namespace == null || this.entityPath == null) {
+ throw new ConfigException(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+ + "system: {%s}, stream: {%s}", getSystemName(), streamId));
+ }
+ }
+
+ /**
+ * SAS Key name of the associated input stream. Required to access the input Event Hubs entity per stream.
+ *
+ * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
+ * @return this input descriptor
+ */
+ public EventHubsInputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
+ this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+ return this;
+ }
+
+ /**
+ * SAS Token of the associated input stream. Required to access the input Event Hubs per stream.
+ *
+ * @param sasToken the SAS token required to access the Event Hubs entity
+ * @return this input descriptor
+ */
+ public EventHubsInputDescriptor<StreamMessageType> withSasKey(String sasToken) {
+ this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+ return this;
+ }
+
+ /**
+ * Set the consumer group from the upstream Event Hubs entity that the consumer is part of. Defaults to the
+ * <code>$Default</code> group that is initially present in all Event Hubs entities (unless removed)
+ *
+ * @param consumerGroup the name of the consumer group upstream
+ * @return this input descriptor
+ */
+ public EventHubsInputDescriptor<StreamMessageType> withConsumerGroup(String consumerGroup) {
+ this.consumerGroup = Optional.of(StringUtils.stripToNull(consumerGroup));
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+
+ String streamId = getStreamId();
+
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
+
+ sasKeyName.ifPresent(keyName ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
+ sasToken.ifPresent(key ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
+ this.consumerGroup.ifPresent(consumerGroupName ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId), consumerGroupName));
+ return ehConfigs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
new file mode 100644
index 0000000..cd17033
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+
+/**
+ * A descriptor for an Event Hubs output stream
+ * <p>
+ * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * </p>
+ * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
+ * in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class EventHubsOutputDescriptor<StreamMessageType>
+ extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
+ private String namespace;
+ private String entityPath;
+ private Optional<String> sasKeyName = Optional.empty();
+ private Optional<String> sasToken = Optional.empty();
+
+ /**
+ * Constructs an {@link OutputDescriptor} instance.
+ *
+ * @param streamId id of the stream
+ * @param namespace namespace for the Event Hubs entity to produce to, not null
+ * @param entityPath entity path for the Event Hubs entity to produce to, not null
+ * @param serde serde for messages in the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+ SystemDescriptor systemDescriptor) {
+ super(streamId, serde, systemDescriptor);
+ this.namespace = StringUtils.stripToNull(namespace);
+ this.entityPath = StringUtils.stripToNull(entityPath);
+ if (this.namespace == null || this.entityPath == null) {
+ throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+ + "system: {%s}, stream: {%s}", getSystemName(), streamId));
+ }
+ }
+
+ /**
+ * SAS Key name of the associated output stream. Required to access the output Event Hubs entity per stream.
+ *
+ * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
+ * @return this output descriptor
+ */
+ public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
+ this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+ return this;
+ }
+
+ /**
+ * SAS Token of the associated output stream. Required to access the output Event Hubs per stream.
+ *
+ * @param sasToken the SAS token required to access to Event Hubs entity
+ * @return this output descriptor
+ */
+ public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
+ this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+
+ String streamId = getStreamId();
+
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
+ sasKeyName.ifPresent(keyName ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
+ sasToken.ifPresent(key ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
+ return ehConfigs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
new file mode 100644
index 0000000..4e292d9
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.EventHubSystemFactory;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+
+
+/**
+ * A descriptor for a Event Hubs system.
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ */
+public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
+ private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
+
+ private List<String> streamIds = new ArrayList<>();
+ private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
+ private Optional<Integer> numClientThreads = Optional.empty();
+ private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
+ private Optional<Integer> consumerMaxEventCountPerPoll = Optional.empty();
+ private Optional<Integer> consumerPrefetchCount = Optional.empty();
+ private Optional<Boolean> producerEventhubsSendKey = Optional.empty();
+ private Optional<PartitioningMethod> producerEventhubsPartitioningMethod = Optional.empty();
+
+ /**
+ * Constructs a {@link SystemDescriptor} instance.
+ * @param systemName name of this system
+ */
+ public EventHubsSystemDescriptor(String systemName) {
+ super(systemName, FACTORY_CLASS_NAME, null, null);
+ }
+
+ /**
+ * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
+ * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+ * <p>
+ * The type of messages in the stream is the type of the provided stream level serde.
+ *
+ * @param streamId id of the input stream
+ * @param namespace namespace of the Event Hubs entity to consume from
+ * @param entityPath entity path of the Event Hubs entity to consume from
+ * @param serde stream level serde for the input stream
+ * @param <StreamMessageType> type of messages in this stream
+ * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
+ */
+ public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
+ String entityPath, Serde<StreamMessageType> serde) {
+ streamIds.add(streamId);
+ return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
+ }
+
+ /**
+ * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
+ * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+ * <p>
+ * The type of messages in the stream is the type of the provided stream level serde.
+ *
+ * @param streamId id of the output stream
+ * @param namespace namespace of the Event Hubs entity to produce to
+ * @param entityPath entity path of the Event Hubs entity to produce to
+ * @param serde stream level serde for the output stream
+ * @param <StreamMessageType> type of the messages in this stream
+ * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
+ */
+ public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
+ String entityPath, Serde<StreamMessageType> serde) {
+ streamIds.add(streamId);
+ return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
+ }
+
+ /**
+ * Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.
+ *
+ * @param timeoutMS the timeout in ms for getting runtime information from the Event Hubs system
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withRuntimeInfoTimeout(int timeoutMS) {
+ this.fetchRuntimeInfoTimeout = Optional.of(timeoutMS);
+ return this;
+ }
+
+ /**
+ * Number of threads in thread pool that will be used by the EventHubClient.
+ *
+ * @param numClientThreads the number of threads
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withNumClientThreads(int numClientThreads) {
+ this.numClientThreads = Optional.of(numClientThreads);
+ return this;
+ }
+
+ /**
+ * Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages.
+ * Larger buffer capacity typically leads to better throughput but consumes more memory.
+ *
+ * @param receiveQueueSize the number of messages from Event Hubs that should be buffered in the
+ * {@link org.apache.samza.util.BlockingEnvelopeMap}
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withReceiveQueueSize(int receiveQueueSize) {
+ this.consumerReceiveQueueSize = Optional.of(receiveQueueSize);
+ return this;
+ }
+
+ /**
+ * Maximum number of events that Event Hubs client can return in a receive call.
+ *
+ * @param count the number of max events per poll
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withMaxEventCountPerPoll(int count) {
+ this.consumerMaxEventCountPerPoll = Optional.of(count);
+ return this;
+ }
+
+ /**
+ * Number of events that Event Hubs client should prefetch from the server.
+ *
+ * @param count the number of events that should be prefetched.
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withPrefetchCount(int count) {
+ this.consumerPrefetchCount = Optional.of(count);
+ return this;
+ }
+
+
+ /**
+ * Configure the method that the message is partitioned for the downstream Event Hubs in one of the following ways:
+ * <ul>
+ * <li>ROUND_ROBIN:
+ * The message key and partition key are ignored and the message
+ * will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</li>
+ * <li>EVENT_HUB_HASHING:
+ * Employs the hashing mechanism in Event Hubs to determine, based on the key of the message,
+ * which partition the message should go. Using this method still ensures that all the events with
+ * the same key are sent to the same partition in the event hub. If this option is chosen, the partition
+ * key used for the hash should be a string. If the partition key is not set, the message key is
+ * used instead.</li>
+ * <li>PARTITION_KEY_AS_PARTITION:
+ * Use the integer key specified by the partition key or key of the message to a specific partition
+ * on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity,
+ * a modulo operation will be performed to determine the resulting paritition.
+ * ie. if there are 6 partitions and the key is 9, the message will end up in partition 3.
+ * Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</li>
+ * </ul>
+ * @param partitioningMethod the desired partitioning method for the message in the downstream Event Hubs entity
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withPartitioningMethod(PartitioningMethod partitioningMethod) {
+ this.producerEventhubsPartitioningMethod = Optional.ofNullable(partitioningMethod);
+ return this;
+ }
+
+ /**
+ * If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData
+ * message for Event Hubs. The Samza message key will not be sent otherwise.
+ * Note: If the Samza Event Hubs consumer is used, this field is the partition key of the received EventData, or the
+ * message key if the partition key is not present.
+ *
+ * @param sendKeys set to true if the message key should be sent in the EventData properties, the key is not sent otherwise
+ * @return this system descriptor
+ */
+ public EventHubsSystemDescriptor withSendKeys(boolean sendKeys) {
+ this.producerEventhubsSendKey = Optional.of(sendKeys);
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
+ String systemName = getSystemName();
+
+ if (!this.streamIds.isEmpty()) {
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), String.join(",", this.streamIds));
+ }
+ this.fetchRuntimeInfoTimeout.ifPresent(timeout ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName), Integer.toString(timeout)));
+ this.numClientThreads.ifPresent(numClientThreads ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), Integer.toString(numClientThreads)));
+ this.consumerReceiveQueueSize.ifPresent(receiveQueueSize ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName), Integer.toString(receiveQueueSize)));
+ this.consumerMaxEventCountPerPoll.ifPresent(count ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), Integer.toString(count)));
+ this.consumerPrefetchCount.ifPresent(count ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName), Integer.toString(count)));
+ this.producerEventhubsSendKey.ifPresent(sendKeys ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName), Boolean.toString(sendKeys)));
+ this.producerEventhubsPartitioningMethod.ifPresent(partitioningMethod ->
+ ehConfigs.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), partitioningMethod.toString()));
+ return ehConfigs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
deleted file mode 100644
index b3003a3..0000000
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.Map;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-
-public class TestEventHubsInputDescriptor {
- @Test
- public void testEntityConnectionConfigs() {
- String systemName = "eventHub";
- String streamId = "input-stream";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
- EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
- .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
- .withSasKeyName("secretkey")
- .withSasKey("sasToken-123")
- .withConsumerGroup("$notdefault");
-
- Map<String, String> generatedConfigs = inputDescriptor.toConfig();
- assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
- assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
- assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
- assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
- assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
- assertEquals("$notdefault", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
- }
-
- @Test
- public void testWithoutEntityConnectionConfigs() {
- String systemName = "eventHub";
- String streamId = "input-stream";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
- EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
- .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
-
- Map<String, String> generatedConfigs = inputDescriptor.toConfig();
- assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
- assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
- assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
- assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
- }
-
- @Test
- public void testMissingInputDescriptorFields() {
- String systemName = "eventHub";
- String streamId = "input-stream";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- try {
- systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
- fail("Should have thrown Config Exception");
- } catch (ConfigException exception) {
- assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
- + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
deleted file mode 100644
index fcfcdca..0000000
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.Map;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-public class TestEventHubsOutputDescriptor {
- @Test
- public void testEntityConnectionConfigs() {
- String systemName = "eventHub";
- String streamId = "output-stream";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
- EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
- .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
- .withSasKeyName("secretkey")
- .withSasKey("sasToken-123");
-
- Map<String, String> generatedConfigs = outputDescriptor.toConfig();
- assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
- assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
- assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
- assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
- assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
- }
-
- @Test
- public void testWithoutEntityConnectionConfigs() {
- String systemName = "eventHub";
- String streamId = "output-stream";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
- EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
- .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
-
- Map<String, String> generatedConfigs = outputDescriptor.toConfig();
- assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
- assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
- assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
- assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
- }
-
- @Test
- public void testMissingOutputDescriptorFields() {
- String systemName = "eventHub";
- String streamId = "input-stream";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- try {
- systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
- fail("Should have thrown Config Exception");
- } catch (ConfigException exception) {
- assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
- + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
deleted file mode 100644
index 33bb1ba..0000000
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.Map;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class TestEventHubsSystemDescriptor {
- @Test
- public void testWithDescriptorOverrides() {
- String systemName = "system-name";
- String streamId1 = "input-stream1";
- String streamId2 = "input-stream2";
- String streamId3 = "output-stream1";
- String streamId4 = "output-stream2";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName)
- .withMaxEventCountPerPoll(1000)
- .withNumClientThreads(5)
- .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION)
- .withPrefetchCount(100)
- .withReceiveQueueSize(500)
- .withRuntimeInfoTimeout(60000)
- .withSendKeys(false);
-
- systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
-
- Map<String, String> generatedConfigs = systemDescriptor.toConfig();
- assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
- assertEquals("1000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
- assertEquals("5", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
- assertEquals("PARTITION_KEY_AS_PARTITION", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
- assertEquals("100", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
- assertEquals("500", generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
- assertEquals("60000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
- assertEquals("false", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
- assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
- }
-
- @Test
- public void testWithoutDescriptorOverrides() {
- String systemName = "eventHub";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
- Map<String, String> generatedConfigs = systemDescriptor.toConfig();
- assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
- assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
- assertEquals(1, generatedConfigs.size());
- }
- @Test
- public void testWithInputOutputStreams() {
- String systemName = "system-name";
- String streamId1 = "input-stream1";
- String streamId2 = "input-stream2";
- String streamId3 = "output-stream1";
- String streamId4 = "output-stream2";
-
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
- systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
-
- Map<String, String> generatedConfigs = systemDescriptor.toConfig();
- assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
- assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
- assertEquals(2, generatedConfigs.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
new file mode 100644
index 0000000..1e6b368
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.Map;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+
+public class TestEventHubsInputDescriptor {
+ @Test
+ public void testEntityConnectionConfigs() {
+ String systemName = "eventHub";
+ String streamId = "input-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+ EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
+ .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+ .withSasKeyName("secretkey")
+ .withSasKey("sasToken-123")
+ .withConsumerGroup("$notdefault");
+
+ Map<String, String> generatedConfigs = inputDescriptor.toConfig();
+ assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
+ assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+ assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+ assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+ assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+ assertEquals("$notdefault", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+ }
+
+ @Test
+ public void testWithoutEntityConnectionConfigs() {
+ String systemName = "eventHub";
+ String streamId = "input-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+ EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
+ .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+
+ Map<String, String> generatedConfigs = inputDescriptor.toConfig();
+ assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
+ assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+ assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+ assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
+ }
+
+ @Test
+ public void testMissingInputDescriptorFields() {
+ String systemName = "eventHub";
+ String streamId = "input-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+ try {
+ systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+ fail("Should have thrown Config Exception");
+ } catch (ConfigException exception) {
+ assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+ + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
new file mode 100644
index 0000000..fa8ae56
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.Map;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class TestEventHubsOutputDescriptor {
+ @Test
+ public void testEntityConnectionConfigs() {
+ String systemName = "eventHub";
+ String streamId = "output-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+ EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
+ .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+ .withSasKeyName("secretkey")
+ .withSasKey("sasToken-123");
+
+ Map<String, String> generatedConfigs = outputDescriptor.toConfig();
+ assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
+ assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+ assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+ assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+ assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+ }
+
+ @Test
+ public void testWithoutEntityConnectionConfigs() {
+ String systemName = "eventHub";
+ String streamId = "output-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+ EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
+ .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+
+ Map<String, String> generatedConfigs = outputDescriptor.toConfig();
+ assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
+ assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+ assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+ assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
+ }
+
+ @Test
+ public void testMissingOutputDescriptorFields() {
+ String systemName = "eventHub";
+ String streamId = "input-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+ try {
+ systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+ fail("Should have thrown Config Exception");
+ } catch (ConfigException exception) {
+ assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+ + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
new file mode 100644
index 0000000..7f73bd9
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.Map;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestEventHubsSystemDescriptor {
+ @Test
+ public void testWithDescriptorOverrides() {
+ String systemName = "system-name";
+ String streamId1 = "input-stream1";
+ String streamId2 = "input-stream2";
+ String streamId3 = "output-stream1";
+ String streamId4 = "output-stream2";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName)
+ .withMaxEventCountPerPoll(1000)
+ .withNumClientThreads(5)
+ .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION)
+ .withPrefetchCount(100)
+ .withReceiveQueueSize(500)
+ .withRuntimeInfoTimeout(60000)
+ .withSendKeys(false);
+
+ systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+ systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+ systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+ systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+
+ Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+ assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+ assertEquals("1000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
+ assertEquals("5", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
+ assertEquals("PARTITION_KEY_AS_PARTITION", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
+ assertEquals("100", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
+ assertEquals("500", generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
+ assertEquals("60000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
+ assertEquals("false", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
+ assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+ }
+
+ @Test
+ public void testWithoutDescriptorOverrides() {
+ String systemName = "eventHub";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+ Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+ assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
+ assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+ assertEquals(1, generatedConfigs.size());
+ }
+ @Test
+ public void testWithInputOutputStreams() {
+ String systemName = "system-name";
+ String streamId1 = "input-stream1";
+ String streamId2 = "input-stream2";
+ String streamId3 = "output-stream1";
+ String streamId4 = "output-stream2";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+ systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+ systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+ systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+ systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
+ new IntegerSerde()));
+
+ Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+ assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+ assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+ assertEquals(2, generatedConfigs.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
deleted file mode 100644
index 5416af5..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ApplicationContainerContext;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContext;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.runtime.ProcessorLifecycleListener;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This is the base class that implements interface {@link ApplicationDescriptor}.
- * <p>
- * This base class contains the common objects that are used by both high-level and low-level API applications, such as
- * {@link Config}, {@link ApplicationContainerContextFactory}, {@link ApplicationTaskContextFactory}, and
- * {@link ProcessorLifecycleListenerFactory}.
- *
- * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either
- * {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor}
- */
-public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
- implements ApplicationDescriptor<S> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
-
- private final Class<? extends SamzaApplication> appClass;
- private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
- // serdes used by input/output/intermediate streams, keyed by streamId
- private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
- // serdes used by tables, keyed by tableId
- private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
- final Config config;
-
- private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty();
- private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty();
-
- // Default to no-op ProcessorLifecycleListenerFactory
- ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new ProcessorLifecycleListener() { };
-
- ApplicationDescriptorImpl(SamzaApplication app, Config config) {
- this.config = config;
- this.appClass = app.getClass();
- }
-
- @Override
- public Config getConfig() {
- return config;
- }
-
- @Override
- public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
- this.applicationContainerContextFactoryOptional = Optional.of(factory);
- return (S) this;
- }
-
- @Override
- public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
- this.applicationTaskContextFactoryOptional = Optional.of(factory);
- return (S) this;
- }
-
- @Override
- public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
- this.listenerFactory = listenerFactory;
- return (S) this;
- }
-
- @Override
- public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
- this.reporterFactories.clear();
- this.reporterFactories.putAll(reporterFactories);
- return (S) this;
- }
-
- /**
- * Get the application class
- *
- * @return an implementation of {@link SamzaApplication}
- */
- public Class<? extends SamzaApplication> getAppClass() {
- return appClass;
- }
-
- /**
- * Get the {@link ApplicationContainerContextFactory} specified by the application.
- *
- * @return {@link ApplicationContainerContextFactory} if application specified it; empty otherwise
- */
- public Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> getApplicationContainerContextFactory() {
- @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationContainerContext
- Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> factoryOptional =
- (Optional) this.applicationContainerContextFactoryOptional;
- return factoryOptional;
- }
-
- /**
- * Get the {@link ApplicationTaskContextFactory} specified by the application.
- *
- * @return {@link ApplicationTaskContextFactory} if application specified it; empty otherwise
- */
- public Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> getApplicationTaskContextFactory() {
- @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationTaskContext
- Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> factoryOptional =
- (Optional) this.applicationTaskContextFactoryOptional;
- return factoryOptional;
- }
-
- /**
- * Get the {@link ProcessorLifecycleListenerFactory} associated with this application
- *
- * @return the {@link ProcessorLifecycleListenerFactory} in this application
- */
- public ProcessorLifecycleListenerFactory getProcessorLifecycleListenerFactory() {
- return listenerFactory;
- }
-
- /**
- * Get the {@link MetricsReporterFactory}s used in the application
- *
- * @return the map of {@link MetricsReporterFactory}s
- */
- public Map<String, MetricsReporterFactory> getMetricsReporterFactories() {
- return Collections.unmodifiableMap(reporterFactories);
- }
-
- /**
- * Get the default {@link SystemDescriptor} in this application
- *
- * @return the default {@link SystemDescriptor}
- */
- public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
- // default is not set
- return Optional.empty();
- }
-
- /**
- * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
- *
- * @param streamId id of the stream
- * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
- */
- public KV<Serde, Serde> getStreamSerdes(String streamId) {
- return streamSerdes.get(streamId);
- }
-
- /**
- * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
- *
- * @param tableId id of the table
- * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
- */
- public KV<Serde, Serde> getTableSerdes(String tableId) {
- return tableSerdes.get(tableId);
- }
-
- /**
- * Get the map of all {@link InputOperatorSpec}s in this applicaiton
- *
- * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level {@link TaskApplication}
- */
- public Map<String, InputOperatorSpec> getInputOperators() {
- return Collections.EMPTY_MAP;
- }
-
- /**
- * Get all the {@link InputDescriptor}s to this application
- *
- * @return an immutable map of streamId to {@link InputDescriptor}
- */
- public abstract Map<String, InputDescriptor> getInputDescriptors();
-
- /**
- * Get all the {@link OutputDescriptor}s from this application
- *
- * @return an immutable map of streamId to {@link OutputDescriptor}
- */
- public abstract Map<String, OutputDescriptor> getOutputDescriptors();
-
- /**
- * Get all the broadcast streamIds from this application
- *
- * @return an immutable set of streamIds
- */
- public abstract Set<String> getBroadcastStreams();
-
- /**
- * Get all the {@link TableDescriptor}s in this application
- *
- * @return an immutable set of {@link TableDescriptor}s
- */
- public abstract Set<TableDescriptor> getTableDescriptors();
-
- /**
- * Get all the unique {@link SystemDescriptor}s in this application
- *
- * @return an immutable set of {@link SystemDescriptor}s
- */
- public abstract Set<SystemDescriptor> getSystemDescriptors();
-
- /**
- * Get all the unique input streamIds in this application
- *
- * @return an immutable set of input streamIds
- */
- public abstract Set<String> getInputStreamIds();
-
- /**
- * Get all the unique output streamIds in this application
- *
- * @return an immutable set of output streamIds
- */
- public abstract Set<String> getOutputStreamIds();
-
- KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
- Serde keySerde, valueSerde;
-
- KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);
-
- if (serde instanceof KVSerde) {
- keySerde = ((KVSerde) serde).getKeySerde();
- valueSerde = ((KVSerde) serde).getValueSerde();
- } else {
- keySerde = new NoOpSerde();
- valueSerde = serde;
- }
-
- if (currentSerdePair == null) {
- if (keySerde instanceof NoOpSerde) {
- LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
- ". Keys will not be (de)serialized");
- }
- if (valueSerde instanceof NoOpSerde) {
- LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
- ". Values will not be (de)serialized");
- }
- streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
- } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
- throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
- + "different serdes.", streamId));
- }
- return streamSerdes.get(streamId);
- }
-
- KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
- Serde keySerde, valueSerde;
- keySerde = kvSerde.getKeySerde();
- valueSerde = kvSerde.getValueSerde();
-
- if (!tableSerdes.containsKey(tableId)) {
- tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
- return tableSerdes.get(tableId);
- }
-
- KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
- if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
- throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
- + "different serdes.", tableId));
- }
- return streamSerdes.get(tableId);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
deleted file mode 100644
index acf4bf7..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import org.apache.samza.config.Config;
-
-
-/**
- * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config}
- */
-public class ApplicationDescriptorUtil {
-
- private ApplicationDescriptorUtil() {
-
- }
-
- /**
- * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config}
- *
- * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name.
- * @param config the {@link Config} for the application
- * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config
- */
- public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) {
- if (app instanceof StreamApplication) {
- return new StreamApplicationDescriptorImpl((StreamApplication) app, config);
- }
- if (app instanceof TaskApplication) {
- return new TaskApplicationDescriptorImpl((TaskApplication) app, config);
- }
- throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication "
- + "and TaskApplication are supported.", app.getClass().getName()));
- }
-
-}
\ No newline at end of file