You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by rban1 <gi...@git.apache.org> on 2017/02/21 21:56:06 UTC

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

GitHub user rban1 opened a pull request:

    https://github.com/apache/storm/pull/1951

    STORM-2371 Implementing new eventhub driver

    Changing the undelying receiver implementation with the latest Eventhub receiver

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rban1/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1951.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1951
    
----
commit 098272d753f406754ad17f4ba3ecd6a08881d82c
Author: Ranjan Banerjee <ra...@microsoft.com>
Date:   2017-02-21T21:51:16Z

    Implementing new eventhub driver

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102773626
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java ---
    @@ -92,50 +110,34 @@ public boolean isOpen() {
       }
     
       @Override
    -  public EventData receive(long timeoutInMilliseconds) {
    +  public EventData receive() {
         long start = System.currentTimeMillis();
    -    Message message = receiver.receive(timeoutInMilliseconds);
    +    Iterable<com.microsoft.azure.eventhubs.EventData> receivedEvents=null;
    +        /*Get one message at a time for backward compatibility behaviour*/
    +    try {
    +      receivedEvents = receiver.receive(1).get();
    +    }catch (Exception e){
    +      logger.error("Exception occured during receive"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         long millis = (end - start);
         receiveApiLatencyMean.update(millis);
         receiveApiCallCount.incr();
    -    
    -    if (message == null) {
    -      //Temporary workaround for AMQP/EH bug of failing to receive messages
    -      /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
    -        throw new RuntimeException(
    -            "Restart EventHubSpout due to failure of receiving messages in "
    -            + millis + " millisecond");
    -      }*/
    +    if (receivedEvents == null) {
           return null;
         }
    -
         receiveMessageCount.incr();
    -
    -    MessageId messageId = createMessageId(message);
    -    return EventData.create(message, messageId);
    -  }
    -  
    -  private MessageId createMessageId(Message message) {
    -    String offset = null;
    -    long sequenceNumber = 0;
    -
    -    for (Section section : message.getPayload()) {
    -      if (section instanceof MessageAnnotations) {
    -        MessageAnnotations annotations = (MessageAnnotations) section;
    -        HashMap annonationMap = (HashMap) annotations.getValue();
    -
    -        if (annonationMap.containsKey(OffsetKey)) {
    -          offset = (String) annonationMap.get(OffsetKey);
    -        }
    -
    -        if (annonationMap.containsKey(SequenceNumberKey)) {
    -          sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
    -        }
    -      }
    +    MessageId messageId=null;
    +    Message message=null;
    +    for (com.microsoft.azure.eventhubs.EventData receivedEvent : receivedEvents) {
    --- End diff --
    
    >for (com.microsoft.azure.eventhubs.EventData receivedEvent : receivedEvents) { [](start = 4, length = 78)
    
    Why ? In the receive(1) call above code - you explicitly specified receive 1 message - so remove this..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102769617
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -84,12 +82,39 @@ public void prepare(Map config, TopologyContext context,
     	@Override
     	public void execute(Tuple tuple) {
     		try {
    -			sender.send(boltConfig.getEventDataFormat().serialize(tuple));
    +			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
    +			if(boltConfig.getPartitionMode() && sender!=null)
    +				sender.send(sendEvent).get();
    +			else if(boltConfig.getPartitionMode() && sender==null)
    +				throw new EventHubException("Sender is null");
    +			else if(!boltConfig.getPartitionMode() && ehClient!=null)
    +				ehClient.send(sendEvent).get();
    +			else if(!boltConfig.getPartitionMode() && ehClient==null)
    +				throw new EventHubException("ehclient is null");
     			collector.ack(tuple);
     		} catch (EventHubException ex) {
     			collector.reportError(ex);
     			collector.fail(tuple);
     		}
    +		catch (Exception e){
    +
    +		}
    --- End diff --
    
    Why catch-all ?
    This could potentially make this bolt a blackhole :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102770531
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java ---
    @@ -65,24 +69,38 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
       }
     
       @Override
    -  public void open(IEventHubFilter filter) throws EventHubException {
    -    logger.info("creating eventhub receiver: partitionId=" + partitionId + 
    -    		", filterString=" + filter.getFilterString());
    +  public void open(String offset) throws EventHubException {
    +    logger.info("creating eventhub receiver: partitionId=" + partitionId +
    +            ", offset=" + offset);
         long start = System.currentTimeMillis();
    -    receiver = new ResilientEventHubReceiver(connectionString, entityName,
    -    		partitionId, consumerGroupName, defaultCredits, filter);
    -    receiver.initialize();
    -    
    +    try {
    +      ehClient = EventHubClient.createFromConnectionString(connectionString).get();
    +      receiver = ehClient.createEpochReceiver(
    +              consumerGroupName,
    +              partitionId,
    +              offset,
    +              false,
    +              1).get();
    +    }catch (Exception e){
    +      logger.info("Exception in creating EventhubClient"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         logger.info("created eventhub receiver, time taken(ms): " + (end-start));
       }
     
       @Override
    -  public void close() {
    +  public void close(){
         if(receiver != null) {
    -      receiver.close();
    +      try {
    +        receiver.close().get();
    --- End diff --
    
    >.get() [](start = 24, length = 6)
    
    if receiver.close() fails - this results into connection leak; handle this case. something like:
    receiver.whenComplete(() -> { ehClient.closeSync() }).get();


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102774770
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java ---
    @@ -92,50 +110,34 @@ public boolean isOpen() {
       }
     
       @Override
    -  public EventData receive(long timeoutInMilliseconds) {
    +  public EventData receive() {
         long start = System.currentTimeMillis();
    -    Message message = receiver.receive(timeoutInMilliseconds);
    +    Iterable<com.microsoft.azure.eventhubs.EventData> receivedEvents=null;
    +        /*Get one message at a time for backward compatibility behaviour*/
    +    try {
    +      receivedEvents = receiver.receive(1).get();
    +    }catch (Exception e){
    +      logger.error("Exception occured during receive"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         long millis = (end - start);
         receiveApiLatencyMean.update(millis);
         receiveApiCallCount.incr();
    -    
    -    if (message == null) {
    -      //Temporary workaround for AMQP/EH bug of failing to receive messages
    -      /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
    -        throw new RuntimeException(
    -            "Restart EventHubSpout due to failure of receiving messages in "
    -            + millis + " millisecond");
    -      }*/
    +    if (receivedEvents == null) {
           return null;
         }
    -
         receiveMessageCount.incr();
    -
    -    MessageId messageId = createMessageId(message);
    -    return EventData.create(message, messageId);
    -  }
    -  
    -  private MessageId createMessageId(Message message) {
    -    String offset = null;
    -    long sequenceNumber = 0;
    -
    -    for (Section section : message.getPayload()) {
    -      if (section instanceof MessageAnnotations) {
    -        MessageAnnotations annotations = (MessageAnnotations) section;
    -        HashMap annonationMap = (HashMap) annotations.getValue();
    -
    -        if (annonationMap.containsKey(OffsetKey)) {
    -          offset = (String) annonationMap.get(OffsetKey);
    -        }
    -
    -        if (annonationMap.containsKey(SequenceNumberKey)) {
    -          sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
    -        }
    -      }
    +    MessageId messageId=null;
    +    Message message=null;
    +    for (com.microsoft.azure.eventhubs.EventData receivedEvent : receivedEvents) {
    +      messageId = new MessageId(partitionId,
    +              receivedEvent.getSystemProperties().getOffset(),
    +              receivedEvent.getSystemProperties().getSequenceNumber());
    +      List<Section> body = new ArrayList<Section>();
    +      body.add(new Data(new Binary((new String(receivedEvent.getBody(), Charset.defaultCharset())).getBytes())));
    +      message = new Message(body);
    --- End diff --
    
    This is not equivalent to the existing behavior. Message could have other amqp sections - ApplicationProperties and SystemProperties.
    Ideally, you should return com.microsoft.azure.eventhubs.EventData. Please remove the EventData type created in the spout library - to eliminate confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102768443
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -70,10 +69,9 @@ public void prepare(Map config, TopologyContext context,
     		logger.info("creating sender: " + boltConfig.getConnectionString()
     				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
     		try {
    -			EventHubClient eventHubClient = EventHubClient.create(
    -					boltConfig.getConnectionString(),
    -					boltConfig.getEntityPath());
    -			sender = eventHubClient.createPartitionSender(myPartitionId);
    +			ehClient = EventHubClient.createFromConnectionString(boltConfig.getConnectionString()).get();
    --- End diff --
    
    >get [](start = 90, length = 3)
    
    use methods ending with sync(...) - which could return cleaner exception stacks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102767172
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -22,10 +22,8 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import com.microsoft.eventhubs.client.EventHubClient;
    +import com.microsoft.azure.eventhubs.*;
    --- End diff --
    
    >.*; [](start = 36, length = 3)
    
    nit: keep them expanded - usual pattern in OS is to specifically import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1951: STORM-2371 Implementing new eventhub driver

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on the issue:

    https://github.com/apache/storm/pull/1951
  
    :clock1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---