You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/10 22:39:24 UTC
samza git commit: Upgrade to latest event hub version (1.0.1)
Repository: samza
Updated Branches:
refs/heads/master 8e000f398 -> fda1e37d0
Upgrade to latest event hub version (1.0.1)
* Upgrade to the latest event hub version 1.0.1
* Adding configs for prefetchCount and maxEventPerPoll
* Fix the high cpu usage issue in SamzaHistogram
* Fixing a race condition in event hub system producer where the future was getting removed while it was being checked for completion resulting in NPE.
Author: Srinivasulu Punuru <sp...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #467 from srinipunuru/upgrade-eh-1.0.1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fda1e37d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fda1e37d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fda1e37d
Branch: refs/heads/master
Commit: fda1e37d0862493cb59ed0907c597a2dc973ef6f
Parents: 8e000f3
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Tue Apr 10 15:39:21 2018 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Tue Apr 10 15:39:21 2018 -0700
----------------------------------------------------------------------
build.gradle | 2 +-
.../versioned/jobs/configuration-table.html | 28 ++-
.../eventhub/EventHubClientManagerFactory.java | 3 +-
.../samza/system/eventhub/EventHubConfig.java | 38 ++++
.../eventhub/SamzaEventHubClientManager.java | 37 ++--
.../eventhub/admin/EventHubSystemAdmin.java | 8 +-
.../consumer/EventHubSystemConsumer.java | 179 ++++++++++---------
.../system/eventhub/metrics/SamzaHistogram.java | 69 ++++---
.../eventhub/producer/AsyncSystemProducer.java | 1 -
.../producer/EventHubSystemProducer.java | 7 +-
.../samza/system/eventhub/MockEventData.java | 21 ++-
.../MockEventHubClientManagerFactory.java | 25 ++-
.../system/eventhub/TestMetricsRegistry.java | 7 +-
.../consumer/TestEventHubSystemConsumer.java | 6 +-
.../producer/ITestEventHubSystemProducer.java | 10 +-
.../producer/TestEventHubSystemProducer.java | 4 +-
.../samza/tools/EventHubConsoleConsumer.java | 64 ++++---
17 files changed, 322 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d96ec96..2f27a03 100644
--- a/build.gradle
+++ b/build.gradle
@@ -202,7 +202,7 @@ project(':samza-azure') {
dependencies {
compile "com.microsoft.azure:azure-storage:5.3.1"
- compile "com.microsoft.azure:azure-eventhubs:0.14.5"
+ compile "com.microsoft.azure:azure-eventhubs:1.0.1"
compile "com.fasterxml.jackson.core:jackson-core:2.8.8"
compile "io.dropwizard.metrics:metrics-core:3.1.2"
compile project(':samza-api')
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 5c41596..4495cb1 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -2279,30 +2279,48 @@
</tr>
<tr>
- <td class="property" id="eventhub-stream-namespace">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td>
+ <td class="property" id="eventhub-stream-namespace">streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td>
<td class="default"></td>
<td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
</tr>
<tr>
- <td class="property" id="eventhub-stream-entity">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td>
+ <td class="property" id="eventhub-stream-entity">streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td>
<td class="default"></td>
<td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
</tr>
<tr>
- <td class="property" id="eventhub-stream-sas-keyname">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td>
+ <td class="property" id="eventhub-stream-sas-keyname">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td>
<td class="default"></td>
<td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
</tr>
<tr>
- <td class="property" id="eventhub-stream-sas-token">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td>
+ <td class="property" id="eventhub-stream-sas-token">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td>
<td class="default"></td>
<td class="description">SAS Token the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
</tr>
<tr>
+ <td class="property" id="eventhub-client-threads">streams.<span class="system">stream-name</span>.<br>eventhubs.numClientThreads</td>
+ <td class="default">10</td>
+ <td class="description">Number of threads in thread pool that will be used by the EventHubClient. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_hub_client.create">here </a> for more details.</td>
+ </tr>
+
+ <tr>
+ <td class="property" id="eventhub-prefetch-count">systems.<span class="system">system-name</span>.<br>eventhubs.prefetchCount</td>
+ <td class="default">999</td>
+ <td class="description">Number of events that EventHub client should prefetch from the server. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receiver.setprefetchcount">here </a> for more details.</td>
+ </tr>
+
+ <tr>
+ <td class="property" id="eventhub-max-event-count">systems.<span class="system">system-name</span>.<br>eventhubs.maxEventCountPerPoll</td>
+ <td class="default">50</td>
+ <td class="description">Maximum number of events that EventHub client can return in a receive call. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__">here </a> for more details.</td>
+ </tr>
+
+ <tr>
<td class="property" id="eventhub-runtime-timeout">systems.<span class="system">system-name</span>.<br>eventhubs.runtime.info.timeout</td>
<td class="default">60000</td>
<td class="description">Timeout for fetching the runtime metadata from an Eventhub entity on startup in millis.</td>
@@ -2333,7 +2351,7 @@
</tr>
<tr>
- <td class="property" id="eventhub-consumer-group">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td>
+ <td class="property" id="eventhub-consumer-group">streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td>
<td class="default"><code>$Default</code></td>
<td class="description">
Consumer only config. Set the consumer group from the upstream Eventhub that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Eventhub entities (unless removed)
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
index 0578a50..879a07f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java
@@ -26,7 +26,8 @@ public class EventHubClientManagerFactory {
String entityPath = config.getStreamEntityPath(systemName, streamName);
String sasKeyName = config.getStreamSasKeyName(systemName, streamName);
String sasToken = config.getStreamSasToken(systemName, streamName);
+ int numClientThreads = config.getNumClientThreads(systemName);
- return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken);
+ return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken, numClientThreads);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 7df92c0..6639dd8 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -20,6 +20,7 @@
package org.apache.samza.system.eventhub;
import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -49,6 +50,15 @@ public class EventHubConfig extends MapConfig {
public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group";
public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
+ public static final String CONFIG_SYSTEM_NUM_CLIENT_THREADS = "streams.%s.eventhubs.numClientThreads";
+ public static final int DEFAULT_CONFIG_SYSTEM_NUM_CLIENT_THREADS = 10;
+
+ public static final String CONFIG_PREFETCH_COUNT = "systems.%s.eventhubs.prefetchCount";
+ public static final int DEFAULT_CONFIG_PREFETCH_COUNT = PartitionReceiver.DEFAULT_PREFETCH_COUNT;
+
+ public static final String CONFIG_MAX_EVENT_COUNT_PER_POLL = "systems.%s.eventhubs.maxEventCountPerPoll";
+ public static final int DEFAULT_CONFIG_MAX_EVENT_COUNT_PER_POLL = 50;
+
public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method";
public static final String DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD = EventHubSystemProducer
.PartitioningMethod.EVENT_HUB_HASHING.name();
@@ -144,6 +154,34 @@ public class EventHubConfig extends MapConfig {
}
/**
+ * Get the number of client threads, This is used to create the ThreadPool executor that is passed to the
+ * {@link EventHubClient#create}
+ * @param systemName Name of the system.
+ * @return Num of client threads to use.
+ */
+ public Integer getNumClientThreads(String systemName) {
+ return getInt(String.format(CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), DEFAULT_CONFIG_SYSTEM_NUM_CLIENT_THREADS);
+ }
+
+ /**
+ * Get the max event count returned per poll
+ * @param systemName Name of the system
+ * @return Max number of events returned per poll
+ */
+ public Integer getMaxEventCountPerPoll(String systemName) {
+ return getInt(String.format(CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), DEFAULT_CONFIG_MAX_EVENT_COUNT_PER_POLL);
+ }
+
+ /**
+ * Get the per partition prefetch count for the event hub client
+ * @param systemName Name of the system.
+ * @return Per partition Prefetch count for the event hub client.
+ */
+ public Integer getPrefetchCount(String systemName) {
+ return getInt(String.format(CONFIG_PREFETCH_COUNT, systemName), DEFAULT_CONFIG_PREFETCH_COUNT);
+ }
+
+ /**
* Get the EventHubs max Message size
*
* @param systemName name of the system
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
index 977e252..a884a79 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
@@ -19,12 +19,15 @@
package org.apache.samza.system.eventhub;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.servicebus.ClientConstants;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.RetryExponential;
-import com.microsoft.azure.servicebus.RetryPolicy;
-import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+import com.microsoft.azure.eventhubs.impl.RetryExponential;
+import com.microsoft.azure.eventhubs.RetryPolicy;
+import com.microsoft.azure.eventhubs.EventHubException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager {
private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000);
private static final int MAX_RETRY_COUNT = 100;
private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY";
+ private final int numClientThreads;
private EventHubClient eventHubClient;
@@ -51,30 +55,38 @@ public class SamzaEventHubClientManager implements EventHubClientManager {
private final String sasKeyName;
private final String sasKey;
private final RetryPolicy retryPolicy;
+ private ExecutorService eventHubClientExecutor;
- public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey) {
+ public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
+ Integer numClientThreads) {
this(eventHubNamespace, entityPath, sasKeyName, sasKey,
- new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY));
+ new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY), numClientThreads);
}
public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
- RetryPolicy retryPolicy) {
+ RetryPolicy retryPolicy, int numClientThreads) {
this.eventHubNamespace = eventHubNamespace;
this.entityPath = entityPath;
this.sasKeyName = sasKeyName;
this.sasKey = sasKey;
this.retryPolicy = retryPolicy;
+ this.numClientThreads = numClientThreads;
}
@Override
public void init() {
String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace);
try {
- ConnectionStringBuilder connectionStringBuilder =
- new ConnectionStringBuilder(eventHubNamespace, entityPath, sasKeyName, sasKey);
+ ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder()
+ .setNamespaceName(eventHubNamespace)
+ .setEventHubName(entityPath)
+ .setSasKeyName(sasKeyName)
+ .setSasKey(sasKey);
- eventHubClient = EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(), retryPolicy);
- } catch (IOException | ServiceBusException e) {
+ ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder().setNameFormat("Samza EventHubClient Thread-%d");
+ eventHubClientExecutor = Executors.newFixedThreadPool(numClientThreads, threadFactoryBuilder.build());
+ eventHubClient = EventHubClient.createSync(connectionStringBuilder.toString(), retryPolicy, eventHubClientExecutor);
+ } catch (IOException | EventHubException e) {
String msg = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d",
entityPath, remoteHost, ClientConstants.AMQPS_PORT);
LOG.error(msg, e);
@@ -92,6 +104,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager {
try {
if (timeoutMS == EventHubClientManager.BLOCK_UNTIL_CLOSE) {
eventHubClient.closeSync();
+ eventHubClientExecutor.shutdown();
} else {
CompletableFuture<Void> future = eventHubClient.close();
future.get(timeoutMS, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 5564747..acb1775 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -19,8 +19,8 @@
package org.apache.samza.system.eventhub.admin;
-import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
@@ -144,10 +144,10 @@ public class EventHubSystemAdmin implements SystemAdmin {
private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
- Map<String, CompletableFuture<EventHubPartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
+ Map<String, CompletableFuture<PartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>();
for (String partition : partitionIds) {
- CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
+ CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
.getEventHubClient()
.getPartitionRuntimeInformation(partition);
@@ -157,7 +157,7 @@ public class EventHubSystemAdmin implements SystemAdmin {
ehRuntimeInfos.forEach((partitionId, ehPartitionRuntimeInfo) -> {
try {
long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
- EventHubPartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
+ PartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
// Set offsets
String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 90c73dc..f00944b 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -1,45 +1,31 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.system.eventhub.consumer;
import com.microsoft.azure.eventhubs.EventData;
-import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.samza.SamzaException;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
-import org.apache.samza.system.eventhub.EventHubClientManager;
-import org.apache.samza.system.eventhub.EventHubConfig;
-import org.apache.samza.system.eventhub.Interceptor;
-import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
-import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -51,11 +37,26 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -96,13 +97,12 @@ import java.util.stream.Collectors;
*/
public class EventHubSystemConsumer extends BlockingEnvelopeMap {
private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
- private static final int MAX_EVENT_COUNT_PER_PARTITION_POLL = 50;
// Overall timeout for EventHubClient exponential backoff policy
private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
- public static final String START_OF_STREAM = PartitionReceiver.START_OF_STREAM; // -1
+ public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
public static final String END_OF_STREAM = "-2";
public static final String EVENT_READ_RATE = "eventReadRate";
public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
@@ -122,11 +122,14 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
private final Map<String, SamzaHistogram> readLatencies;
private final Map<String, Counter> readErrors;
- final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers =
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, EventHubClientManager> streamEventHubManagers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>();
private final Map<String, Interceptor> interceptors;
+ private final Integer prefetchCount;
private boolean isStarted = false;
private final EventHubConfig config;
private final String systemName;
@@ -135,8 +138,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null);
public EventHubSystemConsumer(EventHubConfig config, String systemName,
- EventHubClientManagerFactory eventHubClientManagerFactory,
- Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+ EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
+ MetricsRegistry registry) {
super(registry, System::currentTimeMillis);
this.config = config;
@@ -145,21 +148,24 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
List<String> streamIds = config.getStreams(systemName);
// Create and initiate connections to Event Hubs
for (String streamId : streamIds) {
- EventHubClientManager eventHubClientManager = eventHubClientManagerFactory
- .getEventHubClientManager(systemName, streamId, config);
+ EventHubClientManager eventHubClientManager =
+ eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
streamEventHubManagers.put(streamId, eventHubClientManager);
eventHubClientManager.init();
}
+ prefetchCount = config.getPrefetchCount(systemName);
+
+
// Initiate metrics
- eventReadRates = streamIds.stream()
- .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+ eventReadRates =
+ streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
eventByteReadRates = streamIds.stream()
- .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+ .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
readLatencies = streamIds.stream()
- .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
- readErrors = streamIds.stream()
- .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS)));
+ .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+ readErrors =
+ streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS)));
// Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
synchronized (AGGREGATE_METRICS_LOCK) {
@@ -183,7 +189,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
if (streamPartitionOffsets.containsKey(systemStreamPartition)) {
// Only update if new offset is lower than previous offset
- if (END_OF_STREAM.equals(offset)) return;
+ if (END_OF_STREAM.equals(offset)) {
+ return;
+ }
String prevOffset = streamPartitionOffsets.get(systemStreamPartition);
if (!END_OF_STREAM.equals(prevOffset) && EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) {
return;
@@ -192,20 +200,20 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
streamPartitionOffsets.put(systemStreamPartition, offset);
}
- private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName, Integer partitionId) {
- CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfoFuture = eventHubClientManager
- .getEventHubClient()
- .getPartitionRuntimeInformation(partitionId.toString());
+ private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName,
+ Integer partitionId) {
+ CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfoFuture =
+ eventHubClientManager.getEventHubClient().getPartitionRuntimeInformation(partitionId.toString());
try {
long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
- EventHubPartitionRuntimeInformation partitionRuntimeInformation = partitionRuntimeInfoFuture
- .get(timeoutMs, TimeUnit.MILLISECONDS);
+ PartitionRuntimeInformation partitionRuntimeInformation =
+ partitionRuntimeInfoFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
return partitionRuntimeInformation.getLastEnqueuedOffset();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- String msg = String.format(
- "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
+ String msg =
+ String.format("Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
systemName, streamName, partitionId);
throw new SamzaException(msg);
}
@@ -235,21 +243,23 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
// If the offset is greater than the newest offset, use the use current Instant as
// offset to fetch in Eventhub.
receiver = eventHubClientManager.getEventHubClient()
- .createReceiverSync(consumerGroup, partitionId.toString(), Instant.now());
+ .createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now()));
} else {
// If the offset is less or equal to the newest offset in the system, it can be
// used as the starting offset to receive from. EventHub will return the first
// message AFTER the offset that was specified in the fetch request.
// If no such offset exists Eventhub will return an error.
receiver = eventHubClientManager.getEventHubClient()
- .createReceiverSync(consumerGroup, partitionId.toString(), offset,
- !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+ .createReceiverSync(consumerGroup, partitionId.toString(),
+ EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
}
- PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId),
- eventByteReadRates.get(streamId), readLatencies.get(streamId), readErrors.get(streamId),
- interceptors.getOrDefault(streamId, null));
+ receiver.setPrefetchCount(prefetchCount);
+ PartitionReceiveHandler handler =
+ new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId), eventByteReadRates.get(streamId),
+ readLatencies.get(streamId), readErrors.get(streamId), interceptors.getOrDefault(streamId, null),
+ config.getMaxEventCountPerPoll(systemName));
// Timeout for EventHubClient receive
receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
@@ -261,16 +271,16 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
streamPartitionReceivers.put(ssp, receiver);
} catch (Exception e) {
throw new SamzaException(
- String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d",
- namespace, entityPath, partitionId), e);
+ String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d", namespace,
+ entityPath, partitionId), e);
}
LOG.debug(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath));
-
}
}
@Override
- public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
Throwable handlerError = eventHubHandlerError.get();
if (handlerError != null) {
@@ -305,8 +315,10 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
// Recreate receiver
PartitionReceiver receiver = eventHubClientManager.getEventHubClient()
- .createReceiverSync(consumerGroup, partitionId.toString(), offset,
- !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
+ .createReceiverSync(consumerGroup, partitionId.toString(),
+ EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
+
+ receiver.setPrefetchCount(prefetchCount);
// Timeout for EventHubClient receive
receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
@@ -314,10 +326,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
// Create and start receiver thread with handler
receiver.setReceiveHandler(streamPartitionHandlers.get(ssp));
streamPartitionReceivers.put(ssp, receiver);
-
} catch (Exception e) {
eventHubHandlerError.set(new SamzaException(
- String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
+ String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
}
}
@@ -336,9 +347,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
}
private boolean isErrorTransient(Throwable throwable) {
- if (throwable instanceof ServiceBusException) {
- ServiceBusException serviceBusException = (ServiceBusException) throwable;
- return serviceBusException.getIsTransient();
+ if (throwable instanceof EventHubException) {
+ EventHubException eventHubException = (EventHubException) throwable;
+ return eventHubException.getIsTransient();
}
return false;
}
@@ -348,24 +359,30 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
return new LinkedBlockingQueue<>(config.getConsumerBufferCapacity(systemName));
}
- protected class PartitionReceiverHandlerImpl extends PartitionReceiveHandler {
+ protected class PartitionReceiverHandlerImpl implements PartitionReceiveHandler {
private final Counter eventReadRate;
private final Counter eventByteReadRate;
private final SamzaHistogram readLatency;
private final Counter errorRate;
private final Interceptor interceptor;
+ private final Integer maxEventCount;
SystemStreamPartition ssp;
PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate,
- SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor) {
- super(MAX_EVENT_COUNT_PER_PARTITION_POLL);
+ SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor, int maxEventCount) {
this.ssp = ssp;
this.eventReadRate = eventReadRate;
this.eventByteReadRate = eventByteReadRate;
this.readLatency = readLatency;
this.errorRate = readErrors;
this.interceptor = interceptor;
+ this.maxEventCount = maxEventCount;
+ }
+
+ @Override
+ public int getMaxEventCount() {
+ return this.maxEventCount;
}
@Override
@@ -415,8 +432,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
errorRate.inc();
aggReadErrors.inc();
- if (throwable instanceof ServiceBusException) {
- ServiceBusException busException = (ServiceBusException) throwable;
+ if (throwable instanceof EventHubException) {
+ EventHubException busException = (EventHubException) throwable;
if (busException.getIsTransient()) {
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
index 7d6d408..03fc114 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
@@ -1,35 +1,36 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.system.eventhub.metrics;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsVisitor;
+
/**
* Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
@@ -49,14 +50,34 @@ public class SamzaHistogram {
this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
this.percentiles = percentiles;
this.gauges = this.percentiles.stream()
- .filter(x -> x > 0 && x <= 100)
- .collect(
- Collectors.toMap(Function.identity(), x -> registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+ .filter(x -> x > 0 && x <= 100)
+ .collect(Collectors.toMap(Function.identity(),
+ x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
}
public void update(long value) {
histogram.update(value);
+ }
+
+ public void updateGaugeValues(double percentile) {
Snapshot values = histogram.getSnapshot();
- percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+ gauges.get(percentile).set(values.getValue(percentile / 100));
+ }
+
+ /**
+ * Custom gauge whose value is set based on the underlying Histogram
+ */
+ private class HistogramGauge extends Gauge<Double> {
+ private final Double percentile;
+
+ public HistogramGauge(Double percentile, String name, double value) {
+ super(name, value);
+ this.percentile = percentile;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ updateGaugeValues(percentile);
+ visitor.gauge(this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
index 901dd6a..307b8f6 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
@@ -139,7 +139,6 @@ public abstract class AsyncSystemProducer implements SystemProducer {
// Auto update the metrics and possible throwable when futures are complete.
sendResult.handle((aVoid, throwable) -> {
- pendingFutures.remove(sendResult);
long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
sendCallbackLatency.get(streamId).update(callbackLatencyMs);
aggSendCallbackLatency.update(callbackLatencyMs);
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 5139dc6..3639bbc 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -21,8 +21,9 @@ package org.apache.samza.system.eventhub.producer;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionSender;
-import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.eventhubs.impl.EventDataImpl;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
@@ -164,7 +165,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
} catch (InterruptedException | ExecutionException | TimeoutException e) {
String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
throw new SamzaException(msg, e);
- } catch (ServiceBusException | IllegalArgumentException e) {
+ } catch (EventHubException | IllegalArgumentException e) {
String msg = "Creation of partition sender failed with exception";
throw new SamzaException(msg, e);
}
@@ -282,7 +283,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
eventValue = interceptor.get().intercept(eventValue);
}
- EventData eventData = new EventData(eventValue);
+ EventData eventData = new EventDataImpl(eventValue);
eventData.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
index b5b55dc..1e3d4f5 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java
@@ -21,15 +21,17 @@ package org.apache.samza.system.eventhub;
import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.impl.EventDataImpl;
import java.nio.charset.Charset;
import java.util.*;
-public class MockEventData extends EventData {
+public class MockEventData implements EventData {
+ EventData eventData;
private EventData.SystemProperties overridedSystemProperties;
private MockEventData(byte[] data, String partitionKey, String offset) {
- super(data);
+ eventData = new EventDataImpl(data);
HashMap<String, Object> properties = new HashMap<>();
properties.put("x-opt-offset", offset);
properties.put("x-opt-partition-key", partitionKey);
@@ -51,6 +53,21 @@ public class MockEventData extends EventData {
}
@Override
+ public Object getObject() {
+ return eventData.getObject();
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return eventData.getBytes();
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return eventData.getProperties();
+ }
+
+ @Override
public EventData.SystemProperties getSystemProperties() {
return overridedSystemProperties;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 368087a..6ee9bcf 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -26,7 +26,6 @@ import org.junit.Assert;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -39,7 +38,7 @@ import static org.mockito.Matchers.*;
public class MockEventHubClientManagerFactory extends EventHubClientManagerFactory {
private Map<SystemStreamPartition, List<EventData>> eventData;
private Map<String, Map<String, Map<Integer, List<EventData>>>> receivedData;
- private Map<String, String> startingOffsets = new HashMap<>();
+ private Map<String, EventPosition> startingOffsets = new HashMap<>();
public MockEventHubClientManagerFactory() {
this.receivedData = new HashMap<>();
@@ -71,7 +70,7 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
handlers.forEach((ssp, value) -> value.onReceive(eventData.get(ssp)));
}
- public String getPartitionOffset(String partitionId) {
+ public EventPosition getPartitionOffset(String partitionId) {
return startingOffsets.getOrDefault(partitionId, null);
}
@@ -101,10 +100,10 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
}
return null;
});
- EventHubPartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(EventHubPartitionRuntimeInformation.class);
+ PartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(PartitionRuntimeInformation.class);
PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset())
.thenReturn(EventHubSystemConsumer.START_OF_STREAM);
- CompletableFuture<EventHubPartitionRuntimeInformation> partitionFuture = new MockPartitionFuture(mockPartitionRuntimeInfo);
+ CompletableFuture<PartitionRuntimeInformation> partitionFuture = new MockPartitionFuture(mockPartitionRuntimeInfo);
// Producer mocks
PartitionSender mockPartitionSender0 = PowerMockito.mock(PartitionSender.class);
@@ -128,16 +127,16 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
try {
// Consumer calls
- PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), any(Instant.class)))
+ PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject()))
.then((Answer<PartitionReceiver>) invocationOnMock -> {
String partitionId = invocationOnMock.getArgumentAt(1, String.class);
- startingOffsets.put(partitionId, EventHubSystemConsumer.END_OF_STREAM);
+ startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
return mockPartitionReceiver;
});
- PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyString(), anyBoolean()))
+ PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject()))
.then((Answer<PartitionReceiver>) invocationOnMock -> {
String partitionId = invocationOnMock.getArgumentAt(1, String.class);
- String offset = invocationOnMock.getArgumentAt(2, String.class);
+ EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
startingOffsets.put(partitionId, offset);
return mockPartitionReceiver;
});
@@ -196,15 +195,15 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
}
}
- private class MockPartitionFuture extends CompletableFuture<EventHubPartitionRuntimeInformation> {
- EventHubPartitionRuntimeInformation runtimeInformation;
+ private class MockPartitionFuture extends CompletableFuture<PartitionRuntimeInformation> {
+ PartitionRuntimeInformation runtimeInformation;
- MockPartitionFuture(EventHubPartitionRuntimeInformation runtimeInformation) {
+ MockPartitionFuture(PartitionRuntimeInformation runtimeInformation) {
this.runtimeInformation = runtimeInformation;
}
@Override
- public EventHubPartitionRuntimeInformation get(long timeout, TimeUnit unit) {
+ public PartitionRuntimeInformation get(long timeout, TimeUnit unit) {
return runtimeInformation;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
index a421cbd..d29b975 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
@@ -70,7 +70,12 @@ public class TestMetricsRegistry implements MetricsRegistry {
@Override
public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
- return null;
+ if (!gauges.containsKey(group)) {
+ gauges.put(group, new ArrayList<>());
+ }
+
+ gauges.get(group).add(value);
+ return value;
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index 4fced77..b40d86d 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class,
+@PrepareForTest({EventHubRuntimeInformation.class, PartitionRuntimeInformation.class,
EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
public class TestEventHubSystemConsumer {
private static final String MOCK_ENTITY_1 = "mocktopic1";
@@ -99,8 +99,8 @@ public class TestEventHubSystemConsumer {
consumer.register(ssp, EventHubSystemConsumer.START_OF_STREAM);
consumer.start();
- Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM,
- eventHubClientWrapperFactory.getPartitionOffset(String.valueOf(partitionId)));
+ Assert.assertEquals(EventPosition.fromOffset(EventHubSystemConsumer.START_OF_STREAM, false).toString(),
+ eventHubClientWrapperFactory.getPartitionOffset(String.valueOf(partitionId)).toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
index 32b1604..63f6daa 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
@@ -21,13 +21,13 @@ package org.apache.samza.system.eventhub.producer;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ServiceBusException;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.system.*;
import org.apache.samza.system.eventhub.*;
-import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
import org.junit.Ignore;
@@ -92,7 +92,7 @@ public class ITestEventHubSystemProducer {
}
@Test
- public void testReceive() throws ServiceBusException {
+ public void testReceive() throws EventHubException {
EventHubClientManagerFactory clientFactory = new EventHubClientManagerFactory();
EventHubClientManager wrapper = clientFactory
.getEventHubClientManager(SYSTEM_NAME, STREAM_NAME1, new EventHubConfig(createEventHubConfig()));
@@ -100,11 +100,11 @@ public class ITestEventHubSystemProducer {
EventHubClient client = wrapper.getEventHubClient();
PartitionReceiver receiver =
client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, "0",
- EventHubSystemConsumer.START_OF_STREAM, true);
+ EventPosition.fromStartOfStream());
receiveMessages(receiver, 300);
}
- private void receiveMessages(PartitionReceiver receiver, int numMessages) throws ServiceBusException {
+ private void receiveMessages(PartitionReceiver receiver, int numMessages) throws EventHubException {
int count = 0;
while (count < numMessages) {
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index b5206bb..9a3bf7d 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -20,9 +20,9 @@
package org.apache.samza.system.eventhub.producer;
import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionSender;
import java.util.ArrayList;
import java.util.HashMap;
@@ -49,7 +49,7 @@ import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
+@PrepareForTest({EventHubRuntimeInformation.class, PartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
public class TestEventHubSystemProducer {
private static final String SOURCE = "TestEventHubSystemProducer";
http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
index 096e12d..a72c8b7 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
@@ -1,38 +1,41 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.tools;
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
+
/**
* Tool to read events from Microsoft Azure event hubs.
*/
@@ -59,17 +62,17 @@ public class EventHubConsoleConsumer {
private static final String OPT_DESC_TOKEN = "Token corresponding to the key.";
public static void main(String[] args)
- throws ServiceBusException, IOException, ExecutionException, InterruptedException {
+ throws EventHubException, IOException, ExecutionException, InterruptedException {
Options options = new Options();
options.addOption(
CommandLineHelper.createOption(OPT_SHORT_EVENTHUB_NAME, OPT_LONG_EVENTHUB_NAME, OPT_ARG_EVENTHUB_NAME, true,
- OPT_DESC_EVENTHUB_NAME));
+ OPT_DESC_EVENTHUB_NAME));
- options.addOption(
- CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true, OPT_DESC_NAMESPACE));
+ options.addOption(CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true,
+ OPT_DESC_NAMESPACE));
- options.addOption(
- CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, OPT_ARG_KEY_NAME, true, OPT_DESC_KEY_NAME));
+ options.addOption(CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, OPT_ARG_KEY_NAME, true,
+ OPT_DESC_KEY_NAME));
options.addOption(
CommandLineHelper.createOption(OPT_SHORT_TOKEN, OPT_LONG_TOKEN, OPT_ARG_TOKEN, true, OPT_DESC_TOKEN));
@@ -93,17 +96,20 @@ public class EventHubConsoleConsumer {
}
private static void consumeEvents(String ehName, String namespace, String keyName, String token)
- throws ServiceBusException, IOException, ExecutionException, InterruptedException {
- ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespace, ehName, keyName, token);
+ throws EventHubException, IOException, ExecutionException, InterruptedException {
+ ConnectionStringBuilder connStr = new ConnectionStringBuilder().setNamespaceName(namespace)
+ .setEventHubName(ehName)
+ .setSasKeyName(keyName)
+ .setSasKey(token);
- EventHubClient client = EventHubClient.createFromConnectionStringSync(connStr.toString());
+ EventHubClient client = EventHubClient.createSync(connStr.toString(), Executors.newFixedThreadPool(10));
EventHubRuntimeInformation runTimeInfo = client.getRuntimeInformation().get();
int numPartitions = runTimeInfo.getPartitionCount();
for (int partition = 0; partition < numPartitions; partition++) {
PartitionReceiver receiver =
client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, String.valueOf(partition),
- PartitionReceiver.START_OF_STREAM);
+ EventPosition.fromStartOfStream());
receiver.receive(10).handle((records, throwable) -> handleComplete(receiver, records, throwable));
}
}