You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by SreeramGarlapati <gi...@git.apache.org> on 2018/03/07 06:17:54 UTC

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

GitHub user SreeramGarlapati opened a pull request:

    https://github.com/apache/storm/pull/2588

    Microsoft Azure EventHubs Storm Spout and Bolt improvements

    This is continuation of work done by @raviperi 
    
    - update to the latest version of eventhubs java client
    -Introduce config params to use latest EH client, control request prefetch size, batch size of events received per call.
    -Refactor the code to group classes more appropriately
    -Remove redundant types
    -Javadoc comments where applicable
    -Preftch config parameter to dictate EH prefetch count
    -config parameter to introduce sleep between spout's nexttuple calls
    -config parameter to retrieve a batched number of events per call to EH
    (opposed to single event)
    -New data scheme to group event payload and audit params into a single
    type, and expose the single type as the only tuple field to downstream
    bolts.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/SreeramGarlapati/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2588.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2588
    
----
commit e9fc45d3d604b65ab3e99895b4a7711bf1b7ab30
Author: Ravi Peri <ra...@...>
Date:   2017-09-11T21:35:48Z

    Introduce config params to dictate EH request prefetch size, max number
    of events received per call.
    
    -Refactor the code to group classes more appropriately
    -Remove redundant types
    -Javadoc comments where applicable
    -Preftch config parameter to dictate EH prefetch count
    -config parameter to introduce sleep between spout's nexttuple calls
    -config parameter to retrieve a batched number of events per call to EH
    (opposed to single event)
    -New data scheme to group event payload and audit params into a single
    type, and expose the single type as the only tuple field to downstream
    bolts.

commit 3f8f37f65551414afdaaf48935b662e9e42836be
Author: Ravi Peri <ra...@...>
Date:   2017-09-13T18:07:18Z

    Remove unncessary file.

commit 9a18f9a8d1629f34fac648e05a3ae61d388e0670
Author: Ravi Peri <ra...@...>
Date:   2017-09-14T17:09:35Z

    Add license header, remove next tuple sleep time as it is redundant.

commit 5a7eb2e1f5d5f1a3f65e1530b284f364e618f99f
Author: Ravi Peri <ra...@...>
Date:   2017-09-14T17:14:08Z

    Remove unused nextTuple sleep interval field

commit 6ba9d11a9343c47add523e55415238b6df010de5
Author: Ravi Peri <ra...@...>
Date:   2017-09-19T19:27:32Z

    Add javadoc for EventHubConfig. Remove unused fields. Improve logging when returning empty/null values

commit 53569ac4ccdca5e0978ac3b8eae2423c97657197
Author: Ravi Peri <ra...@...>
Date:   2017-09-19T20:33:20Z

    Organize usings. Remove extra constructor and fields in EventHubSpoutConfig

commit b1bde9b592024dcf1eaa70365c010cc56cd99313
Author: Ravi Peri <ra...@...>
Date:   2017-09-20T18:30:46Z

    Remove obselete Binary and EventDataSchemes, and add unit tests for supported schemes. Enable scheme based serialization in spout

commit 4516d01487e4dcb2211ea713fb160f6a45bd34fd
Author: Ravi Peri <ra...@...>
Date:   2017-09-20T18:52:50Z

    Mark all fields final in EventHUbMessage

commit 2063f4547939dfde459d8241b80e3b262e5e1816
Author: Ravi Peri <ra...@...>
Date:   2017-09-20T19:15:40Z

    Update batch size log message to debug level

commit 480e2fcdaa4e14f503cde1552a11d53830a21cda
Author: Sreeram Garlapati <sr...@...>
Date:   2018-03-07T03:18:56Z

    Merge branch 'master' of https://github.com/raviperi/storm
    
    # Conflicts:
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/FieldConstants.java
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/StringEventDataScheme.java
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
    #	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
    #	external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
    #	pom.xml

commit a61466d4f4cfda92aea3157abbeec4b3646cdd5d
Author: Sreeram Garlapati <sr...@...>
Date:   2018-03-07T05:32:14Z

    remove files as continuation to the merge commit

commit 19bd1c9ddd13f73cc8e6ddfc2d987c7c8ea58efd
Author: Sreeram Garlapati <sr...@...>
Date:   2018-03-07T06:09:12Z

    move the library to 1.0.0

----


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631986
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    --- End diff --
    
    Nit: You might consider writing this class using the Builder pattern. I think it could reduce the number of constructors a lot.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631364
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -31,116 +30,131 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    -
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionSender;
    +import com.microsoft.azure.eventhubs.EventHubException;
     
     /**
      * A bolt that writes event message to EventHub.
    + * <p>
    + * <p>
    + * The implementation has two modes of operation:
    + * <ul>
    + * <li>partitionmode = true, One bolt for per partition write.</li>
    + * <li>partitionmode = false, use default partitioning key strategy to write to
    + * partition(s)</li>
    + * </ul>
    + * </p>
      */
     public class EventHubBolt extends BaseRichBolt {
    -	private static final long serialVersionUID = 1L;
    -	private static final Logger logger = LoggerFactory
    -			.getLogger(EventHubBolt.class);
    -
    -	protected OutputCollector collector;
    -	protected PartitionSender sender;
    -	protected EventHubClient ehClient;
    -	protected EventHubBoltConfig boltConfig;
    -
    -	public EventHubBolt(String connectionString, String entityPath) {
    -		boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    -	}
    -
    -	public EventHubBolt(String userName, String password, String namespace,
    -			String entityPath, boolean partitionMode) {
    -		boltConfig = new EventHubBoltConfig(userName, password, namespace,
    -				entityPath, partitionMode);
    -	}
    -
    -	public EventHubBolt(EventHubBoltConfig config) {
    -		boltConfig = config;
    -	}
    -
    -	@Override
    -	public void prepare(Map<String, Object> config, TopologyContext context,
    -			OutputCollector collector) {
    -		this.collector = collector;
    -		String myPartitionId = null;
    -		if (boltConfig.getPartitionMode()) {
    -			// We can use the task index (starting from 0) as the partition ID
    -			myPartitionId = "" + context.getThisTaskIndex();
    -		}
    -		logger.info("creating sender: " + boltConfig.getConnectionString()
    -				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
    -		try {
    -			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
    -			if (boltConfig.getPartitionMode()) {
    -				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
    -			}
    -		} catch (Exception ex) {
    -			collector.reportError(ex);
    -			throw new RuntimeException(ex);
    -		}
    -
    -	}
    -
    -	@Override
    -	public void execute(Tuple tuple) {
    -		try {
    -			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
    -			if (boltConfig.getPartitionMode() && sender!=null) {
    -				sender.sendSync(sendEvent);
    -			}
    -			else if (boltConfig.getPartitionMode() && sender==null) {
    -				throw new EventHubException("Sender is null");
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
    -				ehClient.sendSync(sendEvent);
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient==null) {
    -				throw new EventHubException("ehclient is null");
    -			}
    -			collector.ack(tuple);
    -		} catch (EventHubException ex ) {
    -			collector.reportError(ex);
    -			collector.fail(tuple);
    -		} catch (ServiceBusException e) {
    -			collector.reportError(e);
    -			collector.fail(tuple);
    -		}
    -	}
    -
    -	@Override
    -	public void cleanup() {
    -		if(sender != null) {
    -			try {
    -				sender.close().whenComplete((voidargs,error)->{
    -					try{
    -						if(error!=null){
    -							logger.error("Exception during sender cleanup phase"+error.toString());
    -						}
    -						ehClient.closeSync();
    -					}catch (Exception e){
    -						logger.error("Exception during ehclient cleanup phase"+e.toString());
    -					}
    -				}).get();
    -			} catch (InterruptedException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			} catch (ExecutionException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			}
    -			logger.info("Eventhub Bolt cleaned up");
    -			sender = null;
    -			ehClient =  null;
    -		}
    -	}
    -
    -	@Override
    -	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -
    -	}
    -
    +    private static final long serialVersionUID = 1L;
    +    private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);
    +
    +    private ExecutorService executorService;
    +    protected OutputCollector collector;
    +    protected EventHubClient ehClient;
    +    protected PartitionSender sender;
    +    protected EventHubBoltConfig boltConfig;
    +
    +    /**
    +     * Constructs an instance that uses the specified connection string to connect
    +     * to an EventHub and write to the specified entityPath
    +     *
    +     * @param connectionString EventHub connection String
    +     * @param entityPath       entity path to write to
    +     */
    +    public EventHubBolt(String connectionString, String entityPath) {
    +        boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    +    }
    +
    +    /**
    +     * Constructs an instance that connects to an EventHub using the specified
    +     * connection credentials.
    +     *
    +     * @param userName      UserName to connect as
    +     * @param password      Password to use
    +     * @param namespace     target namespace for the service bus
    +     * @param entityPath    Name of the event hub
    +     * @param partitionMode number of partitions
    +     */
    +    public EventHubBolt(String userName, String password, String namespace, String entityPath, boolean partitionMode) {
    +        boltConfig = new EventHubBoltConfig(userName, password, namespace, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance using the specified configuration
    +     *
    +     * @param config EventHub connection and partition configuration
    +     */
    +    public EventHubBolt(EventHubBoltConfig config) {
    +        boltConfig = config;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> config, TopologyContext context,
    +                        OutputCollector collector) {
    +        this.collector = collector;
    +        logger.info(String.format("Conn String: %s, PartitionMode: %s", boltConfig.getConnectionString(),
    --- End diff --
    
    The logger can do String.format inherently. use `{}` for placeholders (e.g. `logger.info("Connection String: {}", boltConfig.getConnectionString())`


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173632056
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    +    /**
    +     * Constructs an instance with specified connection string, and eventhub name
    +     * The @link {@link #partitionMode} is set to false.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     */
    +    public EventHubBoltConfig(final String connectionString, final String entityPath) {
    +        this(connectionString, entityPath, false, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, eventhub name and
    +     * partition mode.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     * @param partitionMode    partitionMode to apply
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode) {
    +        this(connectionString, entityPath, partitionMode, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified credentials, eventhub name and
    +     * partition mode.
    +     * <p>
    +     * <p>
    +     * For soverign clouds please use the constructor
    +     * {@link EventHubBolt#EventHubBolt(String, String)}.
    +     * </p>
    +     *
    +     * @param userName      user name to connect as
    +     * @param password      password for the user name
    +     * @param namespace     servicebus namespace
    +     * @param entityPath    EntityHub name
    +     * @param partitionMode Dictates write mode. if true will write to specific partitions
    +     */
    +    public EventHubBoltConfig(String userName, String password, String namespace, String entityPath,
    +                              boolean partitionMode) {
    +        this(userName, password, namespace, FieldConstants.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, and partition mode.
    +     * The specified {@link IEventDataFormat} will be used to format data to bytes
    +     * before
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param partitionMode    Dictates write mode. if true will write to specific partitions
    +     * @param dataFormat       data formatter for serializing event data
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode, IEventDataFormat dataFormat) {
    +        this.connectionString = new ConnectionStringBuilder(connectionString)
    +                .setEventHubName(entityPath)
    +                .toString();
    +        this.partitionMode = partitionMode;
    +        this.dataFormat = dataFormat;
    +        if (this.dataFormat == null) {
    --- End diff --
    
    Nit: Somewhat less magical if the default format were set in the constructors that call this constructor instead of null having this meaning, but I understand if you want to preserve existing behavior.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634917
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/OffsetFilter.java ---
    @@ -1,31 +1,40 @@
    -/*******************************************************************************
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - *******************************************************************************/
    -package org.apache.storm.eventhubs.spout;
    -
    -import java.io.Serializable;
    -
    -public interface IStateStore extends Serializable {
    -
    -  public void open();
    -
    -  public void close();
    -
    -  public void saveData(String path, String data);
    -
    -  public String readData(String path);
    -}
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +
    +package org.apache.storm.eventhubs.core;
    +
    +public class OffsetFilter implements IEventFilter {
    +    String offset = null;
    +
    +    public OffsetFilter(String offset) {
    +        this.offset = offset;
    +    }
    +
    +    public String getOffset() {
    +        return offset;
    +    }
    +
    +    @Override
    +    public String toString() {
    +        if (offset != null) {
    +            return offset;
    +        }
    +
    +        return null;
    --- End diff --
    
    I don't think returning null from toString is a good idea. Consider using Apache Commons Lang's ToStringBuilder, or manually provide a toString that prints this class name + property values instead. I think this applies to the other toStrings as well, it's easier to use toString for debugging if you print the class name + properties instead of just the wrapped property value.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636763
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/state/ZookeeperStateStore.java ---
    @@ -0,0 +1,111 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.state;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryNTimes;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.microsoft.azure.eventhubs.impl.StringUtil;
    +
    +/**
    + * Zookeeper based implementation of the state store.
    + *
    + * @see IStateStore
    + */
    +public class ZookeeperStateStore implements IStateStore {
    +    private static final long serialVersionUID = -995647135239199102L;
    +    private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
    +    private static final int DEFAULT_RETRIES = 3;
    +    private static final int DEFAULT_RETRY_INTERVAL_MS = 100;
    +    private static final String ZK_LOCAL_URL = "localhost:2181";
    +
    +    private final String zookeeperConnectionString;
    +    private final CuratorFramework curatorFramework;
    +
    +    /**
    +     * Creates a new instance. No connection to Zookeeper is established yet.
    +     *
    +     * @param zookeeperConnectionString Zookeeper connection string
    +     */
    +    public ZookeeperStateStore(String zookeeperConnectionString) {
    +        this(zookeeperConnectionString, DEFAULT_RETRIES, DEFAULT_RETRY_INTERVAL_MS);
    +    }
    +
    +    /**
    +     * Creates a new instance. No connection to Zookeeper is established yet.
    +     *
    +     * @param connectionString Zookeeper connection string (example: zk1.azurehdinsight.net:2181)
    +     * @param retries          number of times to retry for transient failures
    +     * @param retryInterval    Sleep interval (in ms) between retry attempts
    +     */
    +    public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
    +        zookeeperConnectionString = StringUtil.isNullOrWhiteSpace(connectionString) ? ZK_LOCAL_URL : connectionString;
    +        logger.debug("using ZKConnectionString: " + zookeeperConnectionString);
    +
    +        curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString,
    +                new RetryNTimes(retries, retryInterval));
    +    }
    +
    +    @Override
    +    public void open() {
    +        curatorFramework.start();
    +    }
    +
    +    @Override
    +    public void close() {
    +        curatorFramework.close();
    +    }
    +
    +    @Override
    +    public void saveData(String statePath, String data) {
    +        data = StringUtil.isNullOrWhiteSpace(data) ? "" : data;
    +        byte[] bytes = data.getBytes();
    +
    +        try {
    +            if (curatorFramework.checkExists().forPath(statePath) == null) {
    +                curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
    +            } else {
    +                curatorFramework.setData().forPath(statePath, bytes);
    +            }
    +
    +            logger.debug(String.format("data was saved. path: %s, data: %s.", statePath, data));
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public String readData(String statePath) {
    +        try {
    +            if (curatorFramework.checkExists().forPath(statePath) == null) {
    +                return null;
    +            }
    +
    +            String data = new String(curatorFramework.getData().forPath(statePath));
    --- End diff --
    
    Better if you specify a charset


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636662
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/state/ZookeeperStateStore.java ---
    @@ -0,0 +1,111 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.state;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryNTimes;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.microsoft.azure.eventhubs.impl.StringUtil;
    +
    +/**
    + * Zookeeper based implementation of the state store.
    + *
    + * @see IStateStore
    + */
    +public class ZookeeperStateStore implements IStateStore {
    +    private static final long serialVersionUID = -995647135239199102L;
    +    private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
    +    private static final int DEFAULT_RETRIES = 3;
    +    private static final int DEFAULT_RETRY_INTERVAL_MS = 100;
    +    private static final String ZK_LOCAL_URL = "localhost:2181";
    +
    +    private final String zookeeperConnectionString;
    +    private final CuratorFramework curatorFramework;
    --- End diff --
    
    I don't think this is serializable


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173633145
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubConfig.java ---
    @@ -0,0 +1,342 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.Serializable;
    +
    +import org.apache.storm.eventhubs.format.EventHubMessageDataScheme;
    +import org.apache.storm.eventhubs.format.IEventDataScheme;
    +import org.apache.storm.eventhubs.format.StringEventDataScheme;
    +
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    +
    +/**
    + * Captures connection details for EventHub
    + */
    +public class EventHubConfig implements Serializable {
    +    private static final long serialVersionUID = -2913928074769667240L;
    +    protected String userName;
    +    protected String password;
    +    protected String namespace;
    +    protected String entityPath;
    +    protected int partitionCount;
    +    protected String zkConnectionString = null;
    +    protected int checkpointIntervalInSeconds = 10;
    +    protected int receiverCredits = 1024;
    +    protected int maxPendingMsgsPerPartition = FieldConstants.DEFAULT_MAX_PENDING_PER_PARTITION;
    +    protected int receiveEventsMaxCount = FieldConstants.DEFAULT_RECEIVE_MAX_CAP;
    +    protected int prefetchCount = FieldConstants.DEFAULT_PREFETCH_COUNT;
    +    protected long enqueueTimeFilter = 0;
    +    protected String connectionString;
    +    protected String topologyName;
    +    protected IEventDataScheme eventDataScheme = new StringEventDataScheme();
    +    protected String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
    +
    +    public EventHubConfig(String namespace, String entityPath, String userName, String password, int partitionCount) {
    +        this.namespace = namespace;
    +        this.entityPath = entityPath;
    +        this.userName = userName;
    +        this.password = password;
    +        this.partitionCount = partitionCount;
    +        this.connectionString = new ConnectionStringBuilder()
    +                .setNamespaceName(namespace)
    +                .setEventHubName(entityPath)
    +                .setSasKeyName(userName)
    +                .setSasKey(password)
    +                .toString();
    +    }
    +
    +    /**
    +     * Returns username used in credentials provided to EventHub
    +     *
    +     * @return username
    +     */
    +    public String getUserName() {
    +        return userName;
    +    }
    +
    +    /**
    +     * Returns password used in credentials provided to EventHub
    +     *
    +     * @return password
    +     */
    +    public String getPassword() {
    +        return password;
    +    }
    +
    +    /**
    +     * Returns servicebus namespace used when connecting to EventHub
    +     *
    +     * @return servicebus namespace
    +     */
    +    public String getNamespace() {
    +        return namespace;
    +    }
    +
    +    /**
    +     * Returns name of the EventHub
    +     *
    +     * @return EventHub name
    +     */
    +    public String getEntityPath() {
    +        return entityPath;
    +    }
    +
    +    /**
    +     * Returns specified partition count on the EventHub
    +     *
    +     * @return partition count
    +     */
    +    public int getPartitionCount() {
    +        return partitionCount;
    +    }
    +
    +    /**
    +     * Sets the zookeeper connection string. (Example:
    +     * zk1-clusterfqdn:2181,zk2-clusterfqdn:2181)
    +     *
    +     * @param zkConnectionString Zookeeper connection string
    +     */
    +    public void setZkConnectionString(String zkConnectionString) {
    +        this.zkConnectionString = zkConnectionString;
    +    }
    +
    +    /**
    +     * Returns the configured zookeeper connection string.
    +     */
    +    public String getZkConnectionString() {
    +        return zkConnectionString;
    +    }
    +
    +    /**
    +     * Returns the specified frequency interval at which checkpoint information is
    +     * persisted.
    +     *
    +     * @return checkpoint interval
    +     */
    +    public int getCheckpointIntervalInSeconds() {
    +        return checkpointIntervalInSeconds;
    +    }
    +
    +    /**
    +     * Sets the frequency with which checkpoint information is persisted to
    +     * zookeeper
    +     *
    +     * @param checkpointIntervalInSeconds
    +     */
    +    public void setCheckpointIntervalInSeconds(int checkpointIntervalInSeconds) {
    +        this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
    +    }
    +
    +    /**
    +     * Returns configured receivercredits used when connecting to EventHub Note:
    +     * <p>
    +     * This is a legacy setting that will soon be deprecated. Please use the
    +     * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
    +     * </p>
    +     *
    +     * @return
    +     * @deprecated
    +     */
    +    public int getReceiverCredits() {
    +        return receiverCredits;
    +    }
    +
    +    /**
    +     * Configures receivercredits used when connecting to EventHub
    +     * <p>
    +     * Note: This is a legacy setting that will soon be deprecated. Please use the
    +     * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
    +     * </p>
    +     *
    +     * @deprecated
    +     */
    +    public void setReceiverCredits(int receiverCredits) {
    +        this.receiverCredits = receiverCredits;
    +    }
    +
    +    /**
    +     * Returns the configured the size of the pending queue for each partition.
    +     * While the pending queue is at full capacity no new receive calls will be made
    +     * to EventHub. The default value for it is
    +     * {@link FieldConstants#DEFAULT_MAX_PENDING_PER_PARTITION}
    +     *
    +     * @return
    +     */
    +    public int getMaxPendingMsgsPerPartition() {
    +        return maxPendingMsgsPerPartition;
    +    }
    +
    +    /**
    +     * configured the size of the pending queue for each partition. While the
    +     * pending queue is at full capacity no new receive calls will be made to
    +     * EventHub. The default value for it is
    +     * {@link FieldConstants#DEFAULT_MAX_PENDING_PER_PARTITION}
    +     *
    +     * @param maxPendingMsgsPerPartition
    +     */
    +    public void setMaxPendingMsgsPerPartition(int maxPendingMsgsPerPartition) {
    +        this.maxPendingMsgsPerPartition = maxPendingMsgsPerPartition;
    +    }
    +
    +    /**
    +     * Returns the configured upper limit on number of events that can be received
    +     * from EventHub per call. Default is
    +     * {@link FieldConstants#DEFAULT_RECEIVE_MAX_CAP}
    +     *
    +     * @return
    +     */
    +    public int getReceiveEventsMaxCount() {
    +        return receiveEventsMaxCount;
    +    }
    +
    +    /**
    +     * Configures the upper limit on number of events that can be received from
    +     * EventHub per call. Default is {@link FieldConstants#DEFAULT_RECEIVE_MAX_CAP}
    +     * <p>
    +     * Setting this to a value greater than one will reduce the number of calls that
    +     * are made to EventHub. The received events are buffered in an internal cache
    +     * and fed to the spout during the nextTuple call.
    +     * </p>
    +     *
    +     * @param receiveEventsMaxCount
    +     * @return
    +     */
    +    public void setReceiveEventsMaxCount(int receiveEventsMaxCount) {
    +        this.receiveEventsMaxCount = receiveEventsMaxCount;
    +    }
    +
    +    /**
    +     * Returns the configured value for the TimeBased filter for when to start
    +     * receiving events from.
    +     *
    +     * @return
    +     */
    +    public long getEnqueueTimeFilter() {
    +        return enqueueTimeFilter;
    +    }
    +
    +    /**
    +     * Configures value for the TimeBased filter for when to start receiving events
    +     * from.
    +     *
    +     * @param enqueueTimeFilter
    +     */
    +    public void setEnqueueTimeFilter(long enqueueTimeFilter) {
    +        this.enqueueTimeFilter = enqueueTimeFilter;
    +    }
    +
    +    /**
    +     * Returns the connection string used when talking to EventHub
    +     *
    +     * @return
    +     */
    +    public String getConnectionString() {
    +        return connectionString;
    +    }
    +
    +    /**
    +     * Configures the connection string to be used when talking to EventHub
    +     *
    +     * @param connectionString
    +     */
    +    public void setConnectionString(String connectionString) {
    +        this.connectionString = connectionString;
    +    }
    +
    +    /**
    +     * Name of the toppology
    +     *
    +     * @return
    +     */
    +    public String getTopologyName() {
    +        return topologyName;
    +    }
    +
    +    /**
    +     * Name of the topology
    +     *
    +     * @param topologyName
    +     */
    +    public void setTopologyName(String topologyName) {
    +        this.topologyName = topologyName;
    +    }
    +
    +    /**
    +     * Returns the configured Serialization/Deserialization scheme in use.
    +     * <p>
    +     * Please refer to {@link IEventDataScheme} for implementation choices.
    +     * </p>
    +     *
    +     * @return
    +     */
    +    public IEventDataScheme getEventDataScheme() {
    +        return eventDataScheme;
    +    }
    +
    +    /**
    +     * Configures Serialization/Deserialization scheme in use.
    +     * <p>
    +     * Please refer to {@link IEventDataScheme} for implementation choices.
    +     * </p>
    +     *
    +     * @param scheme
    +     */
    +    public void setEventDataScheme(IEventDataScheme scheme) {
    +        this.eventDataScheme = scheme;
    +    }
    +
    +    /**
    +     * Consumer group name to use when receiveing events from EventHub
    --- End diff --
    
    Nit: Receiveing -> receiving


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634265
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubReceiverImpl.java ---
    @@ -0,0 +1,155 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +import com.microsoft.azure.eventhubs.EventHubException;
    +import org.apache.storm.metric.api.CountMetric;
    +import org.apache.storm.metric.api.MeanReducer;
    +import org.apache.storm.metric.api.ReducedMetric;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.collect.Iterables;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +
    +/**
    + * {@link PartitionReceiver} based implementation to receives messages from a
    + * given Eventhub partition
    + *
    + */
    +public class EventHubReceiverImpl implements IEventHubReceiver {
    +	private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
    +
    +	private final EventHubConfig eventHubConfig;
    +	private final String partitionId;
    +
    +	private PartitionReceiver receiver;
    +	private EventHubClient ehClient;
    +	private ExecutorService executorService;
    +
    +	private ReducedMetric receiveApiLatencyMean;
    +	private CountMetric receiveApiCallCount;
    +	private CountMetric receiveMessageCount;
    +
    +	/**
    +	 * Creates a new instance based on provided configuration. The connection, and
    +	 * consumer group settings are read from the passed in EventHubConfig instance.
    +	 * 
    +	 * @param config
    +	 *            Connection, consumer group settings
    +	 * @param partitionId
    +	 *            target partition id to connect to and read from
    +	 */
    +	public EventHubReceiverImpl(EventHubConfig config, String partitionId) {
    +		this.partitionId = partitionId;
    +		this.eventHubConfig = config;
    +
    +		receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
    +		receiveApiCallCount = new CountMetric();
    +		receiveMessageCount = new CountMetric();
    +	}
    +
    +	@Override
    +	public void open(IEventFilter filter) throws IOException, EventHubException {
    +		long start = System.currentTimeMillis();
    +		logger.debug(String.format("Creating EventHub Client: partitionId: %s, filter value:%s, prefetchCount: %s",
    +				partitionId, filter.toString(), String.valueOf(eventHubConfig.getPrefetchCount())));
    +		executorService = Executors.newSingleThreadExecutor();
    +		ehClient = EventHubClient.createSync(eventHubConfig.getConnectionString(), executorService);
    +		receiver = PartitionReceiverFactory.createReceiver(ehClient, filter, eventHubConfig, partitionId);
    +		receiver.setPrefetchCount(eventHubConfig.getPrefetchCount());
    +		logger.debug("created eventhub receiver, time taken(ms): " + (System.currentTimeMillis() - start));
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (receiver == null)
    +			return;
    +
    +		try {
    +			receiver.close().whenCompleteAsync((voidargs, error) -> {
    +				try {
    +					if (error != null) {
    +						logger.error("Exception during receiver close phase: " + error.toString());
    +					}
    +					ehClient.closeSync();
    +				} catch (Exception e) {
    +					logger.error("Exception during ehclient close phase: " + e.toString());
    +				}
    +			}).get();
    +		} catch (InterruptedException | ExecutionException e) {
    +			logger.warn("Exception occured during close phase: " + e.toString());
    +		}
    +
    +		executorService.shutdown();
    +
    +		logger.info("closed eventhub receiver: partitionId=" + partitionId);
    +		ehClient = null;
    +		receiver = null;
    +		executorService = null;
    +	}
    +
    +	@Override
    +	public boolean isOpen() {
    +		return (receiver != null);
    +	}
    +
    +	@Override
    +	public Iterable<EventData> receive() {
    +		return receive(eventHubConfig.getReceiveEventsMaxCount());
    +	}
    +
    +	@Override
    +	public Iterable<EventData> receive(int batchSize) {
    +		long start = System.currentTimeMillis();
    +		Iterable<EventData> receivedEvents = null;
    +
    +		try {
    +			receivedEvents = receiver.receiveSync(batchSize);
    +			if (receivedEvents != null) {
    +				logger.debug("Batchsize: " + batchSize + ", Received event count: " + Iterables.size(receivedEvents));
    +			}
    +		} catch (EventHubException e) {
    +			logger.error("Exception occured during receive" + e.toString());
    +			return null;
    +		}
    +		long end = System.currentTimeMillis();
    +		long millis = (end - start);
    +		receiveApiLatencyMean.update(millis);
    +		receiveApiCallCount.incr();
    +		return receivedEvents;
    --- End diff --
    
    I think it would be safer if you used an empty list instead of null when receiving nothing


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634151
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubReceiverImpl.java ---
    @@ -0,0 +1,155 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +import com.microsoft.azure.eventhubs.EventHubException;
    +import org.apache.storm.metric.api.CountMetric;
    +import org.apache.storm.metric.api.MeanReducer;
    +import org.apache.storm.metric.api.ReducedMetric;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.collect.Iterables;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +
    +/**
    + * {@link PartitionReceiver} based implementation to receives messages from a
    + * given Eventhub partition
    + *
    + */
    +public class EventHubReceiverImpl implements IEventHubReceiver {
    +	private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
    +
    +	private final EventHubConfig eventHubConfig;
    +	private final String partitionId;
    +
    +	private PartitionReceiver receiver;
    +	private EventHubClient ehClient;
    +	private ExecutorService executorService;
    +
    +	private ReducedMetric receiveApiLatencyMean;
    +	private CountMetric receiveApiCallCount;
    +	private CountMetric receiveMessageCount;
    +
    +	/**
    +	 * Creates a new instance based on provided configuration. The connection, and
    +	 * consumer group settings are read from the passed in EventHubConfig instance.
    +	 * 
    +	 * @param config
    +	 *            Connection, consumer group settings
    +	 * @param partitionId
    +	 *            target partition id to connect to and read from
    +	 */
    +	public EventHubReceiverImpl(EventHubConfig config, String partitionId) {
    +		this.partitionId = partitionId;
    +		this.eventHubConfig = config;
    +
    +		receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
    +		receiveApiCallCount = new CountMetric();
    +		receiveMessageCount = new CountMetric();
    +	}
    +
    +	@Override
    +	public void open(IEventFilter filter) throws IOException, EventHubException {
    +		long start = System.currentTimeMillis();
    +		logger.debug(String.format("Creating EventHub Client: partitionId: %s, filter value:%s, prefetchCount: %s",
    +				partitionId, filter.toString(), String.valueOf(eventHubConfig.getPrefetchCount())));
    +		executorService = Executors.newSingleThreadExecutor();
    +		ehClient = EventHubClient.createSync(eventHubConfig.getConnectionString(), executorService);
    +		receiver = PartitionReceiverFactory.createReceiver(ehClient, filter, eventHubConfig, partitionId);
    +		receiver.setPrefetchCount(eventHubConfig.getPrefetchCount());
    +		logger.debug("created eventhub receiver, time taken(ms): " + (System.currentTimeMillis() - start));
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (receiver == null)
    +			return;
    +
    +		try {
    +			receiver.close().whenCompleteAsync((voidargs, error) -> {
    +				try {
    +					if (error != null) {
    +						logger.error("Exception during receiver close phase: " + error.toString());
    +					}
    +					ehClient.closeSync();
    +				} catch (Exception e) {
    +					logger.error("Exception during ehclient close phase: " + e.toString());
    +				}
    +			}).get();
    +		} catch (InterruptedException | ExecutionException e) {
    +			logger.warn("Exception occured during close phase: " + e.toString());
    +		}
    +
    +		executorService.shutdown();
    --- End diff --
    
    I'm not sure how the client uses the executorservice, but you might consider using shutdownNow instead, since shutdown will execute the entire queue of submitted tasks before shutting down. Also consider adding an awaitTermination here so close doesn't return until the service is properly closed.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636141
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---
    @@ -17,246 +17,281 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    +    private IPartitionCoordinator partitionCoordinator;
    +    private IPartitionManagerFactory pmFactory;
    +    private IEventHubReceiverFactory recvFactory;
    +    private SpoutOutputCollector collector;
    +    private long lastCheckpointTime;
    +    private int currentPartitionIndex = -1;
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
         }
    -    this.recvFactory = recvFactory;
    -    if(this.recvFactory == null) {
    -      this.recvFactory = new IEventHubReceiverFactory() {
    -        @Override
    -        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
    -            String partitionId) {
    -          return new EventHubReceiverImpl(spoutConfig, partitionId);
    -        }
    -      };
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount,
    +            final int batchSize) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount, batchSize));
         }
    -    
    -  }
    -  
    -  /**
    -   * This is a extracted method that is easy to test
    -   * @param config
    -   * @param totalTasks
    -   * @param taskIndex
    -   * @param collector
    -   * @throws Exception
    -   */
    -  public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
    -    this.collector = collector;
    -    if(stateStore == null) {
    -      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    -      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
    -        //use storm's zookeeper servers if not specified.
    -        @SuppressWarnings("unchecked")
    -        List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    -        Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    -        StringBuilder sb = new StringBuilder();
    -        for (String zk : zkServers) {
    -          if (sb.length() > 0) {
    -            sb.append(',');
    -          }
    -          sb.append(zk+":"+zkPort);
    +
    +    public EventHubSpout(final EventHubSpoutConfig spoutConfig) {
    +        this(spoutConfig, null, null, null);
    +    }
    +
    +    public EventHubSpout(
    +            final EventHubSpoutConfig spoutConfig,
    +            final IStateStore store,
    +            final IPartitionManagerFactory pmFactory,
    +            final IEventHubReceiverFactory recvFactory) {
    +        this.eventHubConfig = spoutConfig;
    +        this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    +        this.lastCheckpointTime = System.currentTimeMillis();
    +        stateStore = store;
    +        this.pmFactory = pmFactory;
    +        if (this.pmFactory == null) {
    +            this.pmFactory = new IPartitionManagerFactory() {
    +                private static final long serialVersionUID = -3134660797825594845L;
    +
    +                @Override
    +                public IPartitionManager create(EventHubConfig ehConfig, String partitionId, IStateStore stateStore,
    +                                                IEventHubReceiver receiver) {
    +                    return new PartitionManager(spoutConfig, partitionId, stateStore, receiver);
    +                }
    +            };
    +        }
    +        this.recvFactory = recvFactory;
    +        if (this.recvFactory == null) {
    +            this.recvFactory = new IEventHubReceiverFactory() {
    +
    +                private static final long serialVersionUID = 7215384402396274196L;
    +
    +                @Override
    +                public IEventHubReceiver create(EventHubConfig spoutConfig, String partitionId) {
    +                    return new EventHubReceiverImpl(spoutConfig, partitionId);
    +                }
    +
    +            };
             }
    -        zkEndpointAddress = sb.toString();
    -      }
    -      stateStore = new ZookeeperStateStore(zkEndpointAddress,
    -          Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    -          Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
         }
    -    stateStore.open();
     
    -    partitionCoordinator = new StaticPartitionCoordinator(
    -        eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory);
    +    /**
    +     * This is a extracted method that is easy to test
    +     *
    +     * @param config
    +     * @param totalTasks
    +     * @param taskIndex
    +     * @param collector
    +     * @throws Exception
    +     */
    +    public void preparePartitions(
    +            final Map config,
    +            final int totalTasks,
    +            final int taskIndex,
    +            final SpoutOutputCollector collector) throws Exception {
    +        this.collector = collector;
    +        if (stateStore == null) {
    +            String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    +            if (StringUtils.isBlank(zkEndpointAddress)) {
    +                @SuppressWarnings("unchecked")
    +                List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    +                Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    +                zkEndpointAddress = String.join(",",
    --- End diff --
    
    Nit: I think you can simplify this with https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html#joining-java.lang.CharSequence-


---

[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on the issue:

    https://github.com/apache/storm/pull/2588
  
    @srdo - when I try to merge my changes with `master` - I ended up with a merge conflicts due to `line endings` change. is there any simple tip - which I can use to ease through this? like changed my files to a specific `line ending` before the merge etc? 
    Thanks a lot for the help so far!


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631657
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -31,116 +30,131 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    -
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionSender;
    +import com.microsoft.azure.eventhubs.EventHubException;
     
     /**
      * A bolt that writes event message to EventHub.
    + * <p>
    + * <p>
    + * The implementation has two modes of operation:
    + * <ul>
    + * <li>partitionmode = true, One bolt for per partition write.</li>
    + * <li>partitionmode = false, use default partitioning key strategy to write to
    + * partition(s)</li>
    + * </ul>
    + * </p>
      */
     public class EventHubBolt extends BaseRichBolt {
    -	private static final long serialVersionUID = 1L;
    -	private static final Logger logger = LoggerFactory
    -			.getLogger(EventHubBolt.class);
    -
    -	protected OutputCollector collector;
    -	protected PartitionSender sender;
    -	protected EventHubClient ehClient;
    -	protected EventHubBoltConfig boltConfig;
    -
    -	public EventHubBolt(String connectionString, String entityPath) {
    -		boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    -	}
    -
    -	public EventHubBolt(String userName, String password, String namespace,
    -			String entityPath, boolean partitionMode) {
    -		boltConfig = new EventHubBoltConfig(userName, password, namespace,
    -				entityPath, partitionMode);
    -	}
    -
    -	public EventHubBolt(EventHubBoltConfig config) {
    -		boltConfig = config;
    -	}
    -
    -	@Override
    -	public void prepare(Map<String, Object> config, TopologyContext context,
    -			OutputCollector collector) {
    -		this.collector = collector;
    -		String myPartitionId = null;
    -		if (boltConfig.getPartitionMode()) {
    -			// We can use the task index (starting from 0) as the partition ID
    -			myPartitionId = "" + context.getThisTaskIndex();
    -		}
    -		logger.info("creating sender: " + boltConfig.getConnectionString()
    -				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
    -		try {
    -			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
    -			if (boltConfig.getPartitionMode()) {
    -				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
    -			}
    -		} catch (Exception ex) {
    -			collector.reportError(ex);
    -			throw new RuntimeException(ex);
    -		}
    -
    -	}
    -
    -	@Override
    -	public void execute(Tuple tuple) {
    -		try {
    -			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
    -			if (boltConfig.getPartitionMode() && sender!=null) {
    -				sender.sendSync(sendEvent);
    -			}
    -			else if (boltConfig.getPartitionMode() && sender==null) {
    -				throw new EventHubException("Sender is null");
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
    -				ehClient.sendSync(sendEvent);
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient==null) {
    -				throw new EventHubException("ehclient is null");
    -			}
    -			collector.ack(tuple);
    -		} catch (EventHubException ex ) {
    -			collector.reportError(ex);
    -			collector.fail(tuple);
    -		} catch (ServiceBusException e) {
    -			collector.reportError(e);
    -			collector.fail(tuple);
    -		}
    -	}
    -
    -	@Override
    -	public void cleanup() {
    -		if(sender != null) {
    -			try {
    -				sender.close().whenComplete((voidargs,error)->{
    -					try{
    -						if(error!=null){
    -							logger.error("Exception during sender cleanup phase"+error.toString());
    -						}
    -						ehClient.closeSync();
    -					}catch (Exception e){
    -						logger.error("Exception during ehclient cleanup phase"+e.toString());
    -					}
    -				}).get();
    -			} catch (InterruptedException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			} catch (ExecutionException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			}
    -			logger.info("Eventhub Bolt cleaned up");
    -			sender = null;
    -			ehClient =  null;
    -		}
    -	}
    -
    -	@Override
    -	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -
    -	}
    -
    +    private static final long serialVersionUID = 1L;
    +    private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);
    +
    +    private ExecutorService executorService;
    +    protected OutputCollector collector;
    +    protected EventHubClient ehClient;
    +    protected PartitionSender sender;
    +    protected EventHubBoltConfig boltConfig;
    +
    +    /**
    +     * Constructs an instance that uses the specified connection string to connect
    +     * to an EventHub and write to the specified entityPath
    +     *
    +     * @param connectionString EventHub connection String
    +     * @param entityPath       entity path to write to
    +     */
    +    public EventHubBolt(String connectionString, String entityPath) {
    +        boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    +    }
    +
    +    /**
    +     * Constructs an instance that connects to an EventHub using the specified
    +     * connection credentials.
    +     *
    +     * @param userName      UserName to connect as
    +     * @param password      Password to use
    +     * @param namespace     target namespace for the service bus
    +     * @param entityPath    Name of the event hub
    +     * @param partitionMode number of partitions
    +     */
    +    public EventHubBolt(String userName, String password, String namespace, String entityPath, boolean partitionMode) {
    +        boltConfig = new EventHubBoltConfig(userName, password, namespace, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance using the specified configuration
    +     *
    +     * @param config EventHub connection and partition configuration
    +     */
    +    public EventHubBolt(EventHubBoltConfig config) {
    +        boltConfig = config;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> config, TopologyContext context,
    +                        OutputCollector collector) {
    +        this.collector = collector;
    +        logger.info(String.format("Conn String: %s, PartitionMode: %s", boltConfig.getConnectionString(),
    +                String.valueOf(boltConfig.getPartitionMode())));
    +        try {
    +            executorService = Executors.newSingleThreadExecutor();
    +            ehClient = EventHubClient.createSync(boltConfig.getConnectionString(), executorService);
    +            if (boltConfig.getPartitionMode()) {
    +                String partitionId = String.valueOf(context.getThisTaskIndex());
    +                logger.info("Writing to partition id: " + partitionId);
    +                sender = ehClient.createPartitionSenderSync(partitionId);
    +            }
    +        } catch (Exception ex) {
    +            collector.reportError(ex);
    +            throw new RuntimeException(ex);
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            EventData sendEvent = EventData.create(boltConfig.getEventDataFormat().serialize(tuple));
    +            if (sender == null) {
    +                ehClient.sendSync(sendEvent);
    +            } else {
    +                sender.sendSync(sendEvent);
    +            }
    +            collector.ack(tuple);
    +        } catch (EventHubException e) {
    +            collector.reportError(e);
    +            collector.fail(tuple);
    +        }
    +    }
    +
    +    @Override
    +    public void cleanup() {
    +        logger.debug("EventHubBolt cleanup");
    +        try {
    +            ehClient.close().whenCompleteAsync((voidargs, error) -> {
    +                try {
    +                    if (error != null) {
    +                        logger.error("Exception during EventHubBolt cleanup phase" + error.toString());
    --- End diff --
    
    Consider logging the exception without toString so you don't lose the stack trace, i.e. `logger.error("Some message", error)`. This looks like it's a problem in a few places.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636628
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java ---
    @@ -17,240 +17,99 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.microsoft.azure.eventhubs.EventHubClient;
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -
    -import java.io.Serializable;
    -import java.util.ArrayList;
    -import java.util.List;
    -
    -public class EventHubSpoutConfig implements Serializable {
    -	private static final long serialVersionUID = 1L;
    -
    -	public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net";
    -	private final String userName;
    -	private final String password;
    -	private final String namespace;
    -	private final String entityPath;
    -	private final int partitionCount;
    -
    -	private String zkConnectionString = null; // if null then use zookeeper used
    -												// by Storm
    -	private int checkpointIntervalInSeconds = 10;
    -	private int receiverCredits = 1024;
    -	private int maxPendingMsgsPerPartition = 1024;
    -	private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means
    -										// disabling filter
    -	private String connectionString;
    -	private String topologyName;
    -	private IEventDataScheme scheme = new StringEventDataScheme();
    -	private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
    -	private String outputStreamId;
    -
    -
    -	// These are mandatory parameters
    -	public EventHubSpoutConfig(String username, String password,
    -			String namespace, String entityPath, int partitionCount) {
    -		this.userName = username;
    -		this.password = password;
    -		this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
    -				username,password).toString();
    -		this.namespace = namespace;
    -		this.entityPath = entityPath;
    -		this.partitionCount = partitionCount;
    -	}
    -
    -	// Keep this constructor for backward compatibility
    -	public EventHubSpoutConfig(String username, String password,
    -			String namespace, String entityPath, int partitionCount,
    -			String zkConnectionString) {
    -		this(username, password, namespace, entityPath, partitionCount);
    -		setZkConnectionString(zkConnectionString);
    -	}
    -
    -	// Keep this constructor for backward compatibility
    -	public EventHubSpoutConfig(String username, String password,
    -			String namespace, String entityPath, int partitionCount,
    -			String zkConnectionString, int checkpointIntervalInSeconds,
    -			int receiverCredits) {
    -		this(username, password, namespace, entityPath, partitionCount,
    -				zkConnectionString);
    -		setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
    -		setReceiverCredits(receiverCredits);
    -	}
    -
    -	public EventHubSpoutConfig(String username, String password,
    -			String namespace, String entityPath, int partitionCount,
    -			String zkConnectionString, int checkpointIntervalInSeconds,
    -			int receiverCredits, long enqueueTimeFilter) {
    -		this(username, password, namespace, entityPath, partitionCount,
    -				zkConnectionString, checkpointIntervalInSeconds,
    -				receiverCredits);
    -		setEnqueueTimeFilter(enqueueTimeFilter);
    -	}
    -
    -	// Keep this constructor for backward compatibility
    -	public EventHubSpoutConfig(String username, String password,
    -			String namespace, String entityPath, int partitionCount,
    -			String zkConnectionString, int checkpointIntervalInSeconds,
    -			int receiverCredits, int maxPendingMsgsPerPartition,
    -			long enqueueTimeFilter) {
    -
    -		this(username, password, namespace, entityPath, partitionCount,
    -				zkConnectionString, checkpointIntervalInSeconds,
    -				receiverCredits);
    -		setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
    -		setEnqueueTimeFilter(enqueueTimeFilter);
    -	}
    -
    -	public String getNamespace() {
    -		return namespace;
    -	}
    -
    -	public String getEntityPath() {
    -		return entityPath;
    -	}
    -
    -	public int getPartitionCount() {
    -		return partitionCount;
    -	}
    -
    -	public String getZkConnectionString() {
    -		return zkConnectionString;
    -	}
    -
    -	public void setZkConnectionString(String value) {
    -		zkConnectionString = value;
    -	}
    -
    -	public EventHubSpoutConfig withZkConnectionString(String value) {
    -		setZkConnectionString(value);
    -		return this;
    -	}
    -
    -	public int getCheckpointIntervalInSeconds() {
    -		return checkpointIntervalInSeconds;
    -	}
    -
    -	public void setCheckpointIntervalInSeconds(int value) {
    -		checkpointIntervalInSeconds = value;
    -	}
    -
    -	public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) {
    -		setCheckpointIntervalInSeconds(value);
    -		return this;
    -	}
    -
    -	public int getReceiverCredits() {
    -		return receiverCredits;
    -	}
    -
    -	public void setReceiverCredits(int value) {
    -		receiverCredits = value;
    -	}
    -
    -	public EventHubSpoutConfig withReceiverCredits(int value) {
    -		setReceiverCredits(value);
    -		return this;
    -	}
    -
    -	public int getMaxPendingMsgsPerPartition() {
    -		return maxPendingMsgsPerPartition;
    -	}
    -
    -	public void setMaxPendingMsgsPerPartition(int value) {
    -		maxPendingMsgsPerPartition = value;
    -	}
    -
    -	public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) {
    -		setMaxPendingMsgsPerPartition(value);
    -		return this;
    -	}
    -
    -	public long getEnqueueTimeFilter() {
    -		return enqueueTimeFilter;
    -	}
    -
    -	public void setEnqueueTimeFilter(long value) {
    -		enqueueTimeFilter = value;
    -	}
    -
    -	public EventHubSpoutConfig withEnqueueTimeFilter(long value) {
    -		setEnqueueTimeFilter(value);
    -		return this;
    -	}
    -
    -	public String getTopologyName() {
    -		return topologyName;
    -	}
    -
    -	public void setTopologyName(String value) {
    -		topologyName = value;
    -	}
    -
    -	public EventHubSpoutConfig withTopologyName(String value) {
    -		setTopologyName(value);
    -		return this;
    -	}
    -
    -	public IEventDataScheme getEventDataScheme() {
    -		return scheme;
    -	}
    -
    -	public void setEventDataScheme(IEventDataScheme scheme) {
    -		this.scheme = scheme;
    -	}
    -
    -	public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) {
    -		setEventDataScheme(value);
    -		return this;
    -	}
    -
    -	public String getConsumerGroupName() {
    -		return consumerGroupName;
    -	}
    -
    -	public void setConsumerGroupName(String value) {
    -		consumerGroupName = value;
    -	}
    -
    -	public EventHubSpoutConfig withConsumerGroupName(String value) {
    -		setConsumerGroupName(value);
    -		return this;
    -	}
    -
    -	public List<String> getPartitionList() {
    -		List<String> partitionList = new ArrayList<String>();
    -
    -		for (int i = 0; i < this.partitionCount; i++) {
    -			partitionList.add(Integer.toString(i));
    -		}
    -
    -		return partitionList;
    -	}
    -
    -	public String getConnectionString() {
    -		return connectionString;
    -	}
    -
    -	/*Keeping it for backward compatibility*/
    -	public void setTargetAddress(String targetFqnAddress) {
    -	}
    -
    -	public void setTargetAddress(){
    -
    -	}
    -
    -	public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
    -		setTargetAddress(targetFqnAddress);
    -		return this;
    -	}
    -
    -	public String getOutputStreamId() {
    -		return outputStreamId;
    -	}
    -
    -	public void setOutputStreamId(String outputStreamId) {
    -		this.outputStreamId = outputStreamId;
    -	}
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +
    +/**
    + * EventHub configuration. This class remains in
    --- End diff --
    
    Nit: Didn't the other class moves already break binary compatibility? 


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173633306
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubMessage.java ---
    @@ -0,0 +1,151 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.time.Instant;
    +import java.util.Map;
    +
    +import org.apache.storm.eventhubs.format.SerializeDeserializeUtil;
    +
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +/**
    + * Represents a message from EventHub. Encapsulates the actual pay load received
    + * from EventHub.
    + * <p>
    + * It encapsulates the raw bytes from the content, any AMQP application
    + * properties set, and the system properties (partition key, offset, enqueue
    + * time, sequence number, and publisher) set on the Eventhub message.
    + */
    +public class EventHubMessage implements Comparable<EventHubMessage> {
    +    private final byte[] content;
    +    private final String partitionId;
    +    private final String partitionKey;
    +    private final String offset;
    +    private final Instant enqueuedTime;
    +    private final long sequenceNumber;
    +    private final String publisher;
    +    private final MessageId messageId;
    +
    +    private final Map<String, Object> applicationProperties;
    +    private final Map<String, Object> systemProperties;
    +
    +    public EventHubMessage(EventData eventdata, String partitionId) {
    +        this.partitionId = partitionId;
    +
    +        if (eventdata.getBytes() != null) {
    +            content = eventdata.getBytes();
    +        } else if (eventdata.getObject() != null) {
    +            try {
    +                content = SerializeDeserializeUtil.serialize(eventdata.getObject());
    +            } catch (IOException e) {
    +                throw new RuntimeException(e);
    +            }
    +        } else {
    +            throw new RuntimeException("Failed to retrieve payload from EventData");
    +        }
    +
    +        applicationProperties = eventdata.getProperties();
    +        EventData.SystemProperties props = eventdata.getSystemProperties();
    +        systemProperties = props;
    --- End diff --
    
    Nit: The props variable seems unnecessary, could just assign directly


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635700
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/IEventDataScheme.java ---
    @@ -15,30 +15,32 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      *******************************************************************************/
    -package org.apache.storm.eventhubs.spout;
    -
    -import com.microsoft.azure.eventhubs.EventData;
    -import org.apache.storm.tuple.Fields;
    +package org.apache.storm.eventhubs.format;
     
     import java.io.Serializable;
     import java.util.List;
     
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.tuple.Fields;
    +
    +/**
    + * Data scheme to use when deserializing bytes read from eventhub.
    + */
     public interface IEventDataScheme extends Serializable {
     
    -  /**
    -   * Deserialize an AMQP Message into a Tuple.
    -   *
    -   * @see #getOutputFields() for the list of fields the tuple will contain.
    -   *
    -   * @param eventData The EventData to Deserialize.
    -   * @return A tuple containing the deserialized fields of the message.
    -   */
    -  List<Object> deserialize(EventData eventData);
    +    /**
    +     * Deserialize read EventHub Message into a Tuple.
    +     *
    +     * @param eventHubMessage The EventHubMessage to be deserialized.
    +     * @return A list of Objects representing the deserialized the message.
    --- End diff --
    
    Nit: The last "the" is unnecessary


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636465
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---
    @@ -17,246 +17,281 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    +    private IPartitionCoordinator partitionCoordinator;
    +    private IPartitionManagerFactory pmFactory;
    +    private IEventHubReceiverFactory recvFactory;
    +    private SpoutOutputCollector collector;
    +    private long lastCheckpointTime;
    +    private int currentPartitionIndex = -1;
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
         }
    -    this.recvFactory = recvFactory;
    -    if(this.recvFactory == null) {
    -      this.recvFactory = new IEventHubReceiverFactory() {
    -        @Override
    -        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
    -            String partitionId) {
    -          return new EventHubReceiverImpl(spoutConfig, partitionId);
    -        }
    -      };
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount,
    +            final int batchSize) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount, batchSize));
         }
    -    
    -  }
    -  
    -  /**
    -   * This is a extracted method that is easy to test
    -   * @param config
    -   * @param totalTasks
    -   * @param taskIndex
    -   * @param collector
    -   * @throws Exception
    -   */
    -  public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
    -    this.collector = collector;
    -    if(stateStore == null) {
    -      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    -      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
    -        //use storm's zookeeper servers if not specified.
    -        @SuppressWarnings("unchecked")
    -        List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    -        Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    -        StringBuilder sb = new StringBuilder();
    -        for (String zk : zkServers) {
    -          if (sb.length() > 0) {
    -            sb.append(',');
    -          }
    -          sb.append(zk+":"+zkPort);
    +
    +    public EventHubSpout(final EventHubSpoutConfig spoutConfig) {
    +        this(spoutConfig, null, null, null);
    +    }
    +
    +    public EventHubSpout(
    +            final EventHubSpoutConfig spoutConfig,
    +            final IStateStore store,
    +            final IPartitionManagerFactory pmFactory,
    +            final IEventHubReceiverFactory recvFactory) {
    +        this.eventHubConfig = spoutConfig;
    +        this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    +        this.lastCheckpointTime = System.currentTimeMillis();
    +        stateStore = store;
    +        this.pmFactory = pmFactory;
    +        if (this.pmFactory == null) {
    +            this.pmFactory = new IPartitionManagerFactory() {
    +                private static final long serialVersionUID = -3134660797825594845L;
    +
    +                @Override
    +                public IPartitionManager create(EventHubConfig ehConfig, String partitionId, IStateStore stateStore,
    +                                                IEventHubReceiver receiver) {
    +                    return new PartitionManager(spoutConfig, partitionId, stateStore, receiver);
    +                }
    +            };
    +        }
    +        this.recvFactory = recvFactory;
    +        if (this.recvFactory == null) {
    +            this.recvFactory = new IEventHubReceiverFactory() {
    +
    +                private static final long serialVersionUID = 7215384402396274196L;
    +
    +                @Override
    +                public IEventHubReceiver create(EventHubConfig spoutConfig, String partitionId) {
    +                    return new EventHubReceiverImpl(spoutConfig, partitionId);
    +                }
    +
    +            };
             }
    -        zkEndpointAddress = sb.toString();
    -      }
    -      stateStore = new ZookeeperStateStore(zkEndpointAddress,
    -          Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    -          Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
         }
    -    stateStore.open();
     
    -    partitionCoordinator = new StaticPartitionCoordinator(
    -        eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory);
    +    /**
    +     * This is a extracted method that is easy to test
    +     *
    +     * @param config
    +     * @param totalTasks
    +     * @param taskIndex
    +     * @param collector
    +     * @throws Exception
    +     */
    +    public void preparePartitions(
    +            final Map config,
    +            final int totalTasks,
    +            final int taskIndex,
    +            final SpoutOutputCollector collector) throws Exception {
    +        this.collector = collector;
    +        if (stateStore == null) {
    +            String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    +            if (StringUtils.isBlank(zkEndpointAddress)) {
    +                @SuppressWarnings("unchecked")
    +                List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    +                Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    +                zkEndpointAddress = String.join(",",
    +                        zkServers.stream().map(x -> x + ":" + zkPort).collect(Collectors.toList()));
    +            }
     
    -    for (IPartitionManager partitionManager : 
    -      partitionCoordinator.getMyPartitionManagers()) {
    -      partitionManager.open();
    +            stateStore = new ZookeeperStateStore(zkEndpointAddress,
    +                    Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    +                    Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
    +        }
    +        stateStore.open();
    +
    +        LOGGER.info("TaskIndex: " + taskIndex + ", TotalTasks: " + totalTasks + ", Total Partitions:"
    +                + eventHubConfig.getPartitionCount());
    +        partitionCoordinator = new StaticPartitionCoordinator(eventHubConfig, taskIndex, totalTasks, stateStore,
    +                pmFactory, recvFactory);
    +
    +        for (IPartitionManager partitionManager : partitionCoordinator.getMyPartitionManagers()) {
    +            partitionManager.open();
    +        }
         }
    -  }
    -
    -  @Override
    -  public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collector) {
    -    logger.info("begin:start open()");
    -    String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
    -    eventHubConfig.setTopologyName(topologyName);
    -
    -    int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
    -    int taskIndex = context.getThisTaskIndex();
    -    if (totalTasks > eventHubConfig.getPartitionCount()) {
    -      throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");
    +
    +    @Override
    +    public void open(
    +            final Map config,
    +            final TopologyContext context,
    +            final SpoutOutputCollector collector) {
    +        LOGGER.debug("EventHubSpout start: open()");
    +        String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
    +        eventHubConfig.setTopologyName(topologyName);
    +
    +        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
    +        int taskIndex = context.getThisTaskIndex();
    +        if (totalTasks > eventHubConfig.getPartitionCount()) {
    +            throw new RuntimeException("Total tasks of EventHubSpout " + totalTasks
    +                    + " is greater than partition count: " + eventHubConfig.getPartitionCount());
    +        }
    +
    +        LOGGER.info(
    +                String.format("TopologyName: %s, TotalTasks: %d, TaskIndex: %d", topologyName, totalTasks, taskIndex));
    +
    +        try {
    +            preparePartitions(config, totalTasks, taskIndex, collector);
    +        } catch (Exception e) {
    +            collector.reportError(e);
    +            throw new RuntimeException(e);
    +        }
    +
    +        // register metrics
    +        context.registerMetric("EventHubReceiver", new IMetric() {
    +            @Override
    +            public Object getValueAndReset() {
    +                Map<String, Object> concatMetricsDataMaps = new HashMap<String, Object>();
    +                for (IPartitionManager partitionManager : partitionCoordinator.getMyPartitionManagers()) {
    +                    concatMetricsDataMaps.putAll(partitionManager.getMetricsData());
    +                }
    +                return concatMetricsDataMaps;
    +            }
    +        }, Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString()));
    +        LOGGER.debug("EventHubSpout end: open()");
         }
     
    -    logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: %d", topologyName, totalTasks, taskIndex));
    +    @Override
    +    public void nextTuple() {
    +        List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers();
    +        EventHubMessage ehm = null;
    +
    +        for (int i = 0; i < partitionManagers.size(); i++) {
    +            currentPartitionIndex = (currentPartitionIndex + 1) % partitionManagers.size();
    +            IPartitionManager partitionManager = partitionManagers.get(currentPartitionIndex);
    +            if (partitionManager == null) {
    +                throw new RuntimeException(
    +                        "A PartitionManager for partitionid: " + currentPartitionIndex + " doesn't exist.");
    +            }
    +
    +            ehm = partitionManager.receive();
    +            if (ehm != null) {
    +                break;
    +            }
    +        }
    +
    +        if (ehm != null) {
    +            MessageId messageId = ehm.getMessageId();
    +            List<Object> tuples = eventHubConfig.getEventDataScheme().deserialize(ehm);
    +            collector.emit(tuples, messageId);
    +        }
     
    -    try {
    -      preparePartitions(config, totalTasks, taskIndex, collector);
    -    } catch (Exception e) {
    -	  collector.reportError(e);
    -      throw new RuntimeException(e);
    +        checkpointIfNeeded();
         }
    -    
    -    //register metrics
    -    context.registerMetric("EventHubReceiver", new IMetric() {
    -      @Override
    -      public Object getValueAndReset() {
    -          Map concatMetricsDataMaps = new HashMap();
    -          for (IPartitionManager partitionManager : 
    -            partitionCoordinator.getMyPartitionManagers()) {
    -            concatMetricsDataMaps.putAll(partitionManager.getMetricsData());
    -          }
    -          return concatMetricsDataMaps;
    -      }
    -    }, Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString()));
    -    logger.info("end open()");
    -  }
    -
    -  @Override
    -  public void nextTuple() {
    -    EventDataWrap eventDatawrap = null;
    -
    -    List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers();
    -    for (int i = 0; i < partitionManagers.size(); i++) {
    -      currentPartitionIndex = (currentPartitionIndex + 1) % partitionManagers.size();
    -      IPartitionManager partitionManager = partitionManagers.get(currentPartitionIndex);
    -
    -      if (partitionManager == null) {
    -        throw new RuntimeException("partitionManager doesn't exist.");
    -      }
    -
    -      eventDatawrap = partitionManager.receive();
    -
    -      if (eventDatawrap != null) {
    -        break;
    -      }
    +
    +    @Override
    +    public void ack(final Object msgId) {
    +        MessageId messageId = (MessageId) msgId;
    +        IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
    +        String offset = messageId.getOffset();
    +        partitionManager.ack(offset);
         }
     
    -    if (eventDatawrap != null) {
    -      MessageId messageId = eventDatawrap.getMessageId();
    -      List<Object> tuples = scheme.deserialize(eventDatawrap.getEventData());
    -      if (tuples != null) {
    -        collector.emit(tuples, messageId);
    -      }
    +    @Override
    +    public void fail(final Object msgId) {
    +        MessageId messageId = (MessageId) msgId;
    +        IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
    +        String offset = messageId.getOffset();
    +        partitionManager.fail(offset);
         }
    -    
    -    checkpointIfNeeded();
    -
    -    // We don't need to sleep here because the IPartitionManager.receive() is
    -    // a blocked call so it's fine to call this function in a tight loop.
    -  }
    -
    -  @Override
    -  public void ack(Object msgId) {
    -    MessageId messageId = (MessageId) msgId;
    -    IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
    -    String offset = messageId.getOffset();
    -    partitionManager.ack(offset);
    -  }
    -
    -  @Override
    -  public void fail(Object msgId) {
    -    MessageId messageId = (MessageId) msgId;
    -    IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId());
    -    String offset = messageId.getOffset();
    -    partitionManager.fail(offset);
    -  }
    -
    -  @Override
    -  public void deactivate() {
    -    // let's checkpoint so that we can get the last checkpoint when restarting.
    -    checkpoint();
    -  }
    -
    -  @Override
    -  public void close() {
    -    for (IPartitionManager partitionManager : 
    -      partitionCoordinator.getMyPartitionManagers()) {
    -      partitionManager.close();
    +
    +    @Override
    +    public void deactivate() {
    +        // let's checkpoint so that we can get the last checkpoint when restarting.
    +        checkpoint();
         }
    -    stateStore.close();
    -  }
    -
    -  @Override
    -  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -    if (Strings.isNullOrEmpty(eventHubConfig.getOutputStreamId())) {
    -      declarer.declare(scheme.getOutputFields());
    -    } else {
    -      declarer.declareStream(eventHubConfig.getOutputStreamId(), scheme.getOutputFields());
    +
    +    @Override
    +    public void close() {
    +        for (IPartitionManager partitionManager : partitionCoordinator.getMyPartitionManagers()) {
    +            partitionManager.close();
    +        }
    +        stateStore.close();
         }
    -  }
     
    -  private void checkpointIfNeeded() {
    -    long nextCheckpointTime = lastCheckpointTime + checkpointIntervalInSeconds * 1000;
    -    if (nextCheckpointTime < System.currentTimeMillis()) {
    +    @Override
    +    public void declareOutputFields(final OutputFieldsDeclarer declarer) {
    +        List<String> fields = new LinkedList<String>();
    +        fields.add(FieldConstants.MESSAGE_FIELD);
    --- End diff --
    
    You should use the fields declared by the scheme here, otherwise what's the point of having IEventDataScheme provide a declareOutputFields?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635613
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/DefaultEventDataFormat.java ---
    @@ -24,24 +24,24 @@
      * into a delimited string.
      */
     public class DefaultEventDataFormat implements IEventDataFormat {
    -  private static final long serialVersionUID = 1L;
    -  private String delimiter = ",";
    -  
    -  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
    -    this.delimiter = delimiter;
    -    return this;
    -  }
    +    private static final long serialVersionUID = 1L;
    +    private String delimiter = ",";
     
    -  @Override
    -  public byte[] serialize(Tuple tuple) {
    -    StringBuilder sb = new StringBuilder();
    -    for(Object obj : tuple.getValues()) {
    -      if(sb.length() != 0) {
    -        sb.append(delimiter);
    -      }
    -      sb.append(obj.toString());
    +    public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
    +        this.delimiter = delimiter;
    +        return this;
    +    }
    +
    +    @Override
    +    public byte[] serialize(Tuple tuple) {
    +        StringBuilder sb = new StringBuilder();
    +        for (Object obj : tuple.getValues()) {
    +            if (sb.length() != 0) {
    +                sb.append(delimiter);
    +            }
    +            sb.append(obj.toString());
    +        }
    +        return sb.toString().getBytes();
    --- End diff --
    
    Get bytes without charset


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631711
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -31,116 +30,131 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    -
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionSender;
    +import com.microsoft.azure.eventhubs.EventHubException;
     
     /**
      * A bolt that writes event message to EventHub.
    + * <p>
    + * <p>
    + * The implementation has two modes of operation:
    + * <ul>
    + * <li>partitionmode = true, One bolt for per partition write.</li>
    + * <li>partitionmode = false, use default partitioning key strategy to write to
    + * partition(s)</li>
    + * </ul>
    + * </p>
      */
     public class EventHubBolt extends BaseRichBolt {
    -	private static final long serialVersionUID = 1L;
    -	private static final Logger logger = LoggerFactory
    -			.getLogger(EventHubBolt.class);
    -
    -	protected OutputCollector collector;
    -	protected PartitionSender sender;
    -	protected EventHubClient ehClient;
    -	protected EventHubBoltConfig boltConfig;
    -
    -	public EventHubBolt(String connectionString, String entityPath) {
    -		boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    -	}
    -
    -	public EventHubBolt(String userName, String password, String namespace,
    -			String entityPath, boolean partitionMode) {
    -		boltConfig = new EventHubBoltConfig(userName, password, namespace,
    -				entityPath, partitionMode);
    -	}
    -
    -	public EventHubBolt(EventHubBoltConfig config) {
    -		boltConfig = config;
    -	}
    -
    -	@Override
    -	public void prepare(Map<String, Object> config, TopologyContext context,
    -			OutputCollector collector) {
    -		this.collector = collector;
    -		String myPartitionId = null;
    -		if (boltConfig.getPartitionMode()) {
    -			// We can use the task index (starting from 0) as the partition ID
    -			myPartitionId = "" + context.getThisTaskIndex();
    -		}
    -		logger.info("creating sender: " + boltConfig.getConnectionString()
    -				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
    -		try {
    -			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
    -			if (boltConfig.getPartitionMode()) {
    -				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
    -			}
    -		} catch (Exception ex) {
    -			collector.reportError(ex);
    -			throw new RuntimeException(ex);
    -		}
    -
    -	}
    -
    -	@Override
    -	public void execute(Tuple tuple) {
    -		try {
    -			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
    -			if (boltConfig.getPartitionMode() && sender!=null) {
    -				sender.sendSync(sendEvent);
    -			}
    -			else if (boltConfig.getPartitionMode() && sender==null) {
    -				throw new EventHubException("Sender is null");
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
    -				ehClient.sendSync(sendEvent);
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient==null) {
    -				throw new EventHubException("ehclient is null");
    -			}
    -			collector.ack(tuple);
    -		} catch (EventHubException ex ) {
    -			collector.reportError(ex);
    -			collector.fail(tuple);
    -		} catch (ServiceBusException e) {
    -			collector.reportError(e);
    -			collector.fail(tuple);
    -		}
    -	}
    -
    -	@Override
    -	public void cleanup() {
    -		if(sender != null) {
    -			try {
    -				sender.close().whenComplete((voidargs,error)->{
    -					try{
    -						if(error!=null){
    -							logger.error("Exception during sender cleanup phase"+error.toString());
    -						}
    -						ehClient.closeSync();
    -					}catch (Exception e){
    -						logger.error("Exception during ehclient cleanup phase"+e.toString());
    -					}
    -				}).get();
    -			} catch (InterruptedException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			} catch (ExecutionException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			}
    -			logger.info("Eventhub Bolt cleaned up");
    -			sender = null;
    -			ehClient =  null;
    -		}
    -	}
    -
    -	@Override
    -	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -
    -	}
    -
    +    private static final long serialVersionUID = 1L;
    +    private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);
    +
    +    private ExecutorService executorService;
    +    protected OutputCollector collector;
    +    protected EventHubClient ehClient;
    +    protected PartitionSender sender;
    +    protected EventHubBoltConfig boltConfig;
    +
    +    /**
    +     * Constructs an instance that uses the specified connection string to connect
    +     * to an EventHub and write to the specified entityPath
    +     *
    +     * @param connectionString EventHub connection String
    +     * @param entityPath       entity path to write to
    +     */
    +    public EventHubBolt(String connectionString, String entityPath) {
    +        boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    +    }
    +
    +    /**
    +     * Constructs an instance that connects to an EventHub using the specified
    +     * connection credentials.
    +     *
    +     * @param userName      UserName to connect as
    +     * @param password      Password to use
    +     * @param namespace     target namespace for the service bus
    +     * @param entityPath    Name of the event hub
    +     * @param partitionMode number of partitions
    +     */
    +    public EventHubBolt(String userName, String password, String namespace, String entityPath, boolean partitionMode) {
    +        boltConfig = new EventHubBoltConfig(userName, password, namespace, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance using the specified configuration
    +     *
    +     * @param config EventHub connection and partition configuration
    +     */
    +    public EventHubBolt(EventHubBoltConfig config) {
    +        boltConfig = config;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> config, TopologyContext context,
    +                        OutputCollector collector) {
    +        this.collector = collector;
    +        logger.info(String.format("Conn String: %s, PartitionMode: %s", boltConfig.getConnectionString(),
    +                String.valueOf(boltConfig.getPartitionMode())));
    +        try {
    +            executorService = Executors.newSingleThreadExecutor();
    +            ehClient = EventHubClient.createSync(boltConfig.getConnectionString(), executorService);
    +            if (boltConfig.getPartitionMode()) {
    +                String partitionId = String.valueOf(context.getThisTaskIndex());
    +                logger.info("Writing to partition id: " + partitionId);
    +                sender = ehClient.createPartitionSenderSync(partitionId);
    +            }
    +        } catch (Exception ex) {
    +            collector.reportError(ex);
    +            throw new RuntimeException(ex);
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            EventData sendEvent = EventData.create(boltConfig.getEventDataFormat().serialize(tuple));
    +            if (sender == null) {
    +                ehClient.sendSync(sendEvent);
    +            } else {
    +                sender.sendSync(sendEvent);
    +            }
    +            collector.ack(tuple);
    +        } catch (EventHubException e) {
    +            collector.reportError(e);
    +            collector.fail(tuple);
    +        }
    +    }
    +
    +    @Override
    +    public void cleanup() {
    +        logger.debug("EventHubBolt cleanup");
    +        try {
    +            ehClient.close().whenCompleteAsync((voidargs, error) -> {
    --- End diff --
    
    Nit: Any reason to use whenCompleteAsync over whenComplete?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634426
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/IEventHubReceiver.java ---
    @@ -0,0 +1,78 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubException;
    +
    +/**
    + * EventHub based Receiver contracts
    + *
    + */
    +public interface IEventHubReceiver {
    +
    +	/**
    +	 * Open / Establish connection to Eventhub given filters. The partition to
    +	 * receive events from will be specified in an implementation specific way.
    +	 * 
    +	 * @param filter
    +	 *            offset or timestamp based filter
    +	 * @throws EventHubException
    +	 * @see {@link IEventFilter} {@link OffsetFilter} {@link TimestampFilter}
    +	 */
    +	void open(IEventFilter filter) throws IOException, EventHubException ;
    +
    +	/**
    +	 * Cleanup and close connection to Eventhub
    +	 */
    +	void close();
    +
    +	/**
    +	 * Check if connection to eventhub is active
    +	 * 
    +	 * @return
    --- End diff --
    
    Just remove this if you're not writing anything here, since it's just noise at that point. Same for the `@throws` above.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631928
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    +    /**
    +     * Constructs an instance with specified connection string, and eventhub name
    +     * The @link {@link #partitionMode} is set to false.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     */
    +    public EventHubBoltConfig(final String connectionString, final String entityPath) {
    +        this(connectionString, entityPath, false, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, eventhub name and
    +     * partition mode.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     * @param partitionMode    partitionMode to apply
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode) {
    +        this(connectionString, entityPath, partitionMode, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified credentials, eventhub name and
    +     * partition mode.
    +     * <p>
    +     * <p>
    +     * For soverign clouds please use the constructor
    +     * {@link EventHubBolt#EventHubBolt(String, String)}.
    +     * </p>
    +     *
    +     * @param userName      user name to connect as
    +     * @param password      password for the user name
    +     * @param namespace     servicebus namespace
    +     * @param entityPath    EntityHub name
    +     * @param partitionMode Dictates write mode. if true will write to specific partitions
    +     */
    +    public EventHubBoltConfig(String userName, String password, String namespace, String entityPath,
    +                              boolean partitionMode) {
    +        this(userName, password, namespace, FieldConstants.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, and partition mode.
    +     * The specified {@link IEventDataFormat} will be used to format data to bytes
    +     * before
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param partitionMode    Dictates write mode. if true will write to specific partitions
    --- End diff --
    
    Same as above


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631823
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    +    /**
    +     * Constructs an instance with specified connection string, and eventhub name
    +     * The @link {@link #partitionMode} is set to false.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     */
    +    public EventHubBoltConfig(final String connectionString, final String entityPath) {
    +        this(connectionString, entityPath, false, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, eventhub name and
    +     * partition mode.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     * @param partitionMode    partitionMode to apply
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode) {
    +        this(connectionString, entityPath, partitionMode, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified credentials, eventhub name and
    +     * partition mode.
    +     * <p>
    +     * <p>
    +     * For soverign clouds please use the constructor
    --- End diff --
    
    Nit: soverign -> sovereign(?)


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635745
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/EventHubMessageDataScheme.java ---
    @@ -0,0 +1,54 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.format;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.tuple.Fields;
    +
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +/**
    + * This scheme constructs an {@link EventHubMessage} object from the received
    + * EventHub events, and exposes the constructed EventHubMessage object as a single
    + * tuple.
    + *
    + * @see EventHubMessage
    + */
    +public class EventHubMessageDataScheme implements IEventDataScheme {
    +
    +    private static final long serialVersionUID = 5548996695376773616L;
    +
    +    @Override
    +    public Fields getOutputFields() {
    +        List<String> fields = new LinkedList<String>();
    +        fields.add(FieldConstants.MESSAGE_FIELD);
    +
    +        return new Fields(fields);
    +    }
    +
    +    @Override
    +    public List<Object> deserialize(EventHubMessage eventHubMessage) {
    +        List<Object> contents = new LinkedList<Object>();
    +        contents.add(eventHubMessage);
    --- End diff --
    
    Are you sure this works? EventHubMessage doesn't look serializable.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631476
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -31,116 +30,131 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    -
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionSender;
    +import com.microsoft.azure.eventhubs.EventHubException;
     
     /**
      * A bolt that writes event message to EventHub.
    + * <p>
    + * <p>
    + * The implementation has two modes of operation:
    + * <ul>
    + * <li>partitionmode = true, One bolt for per partition write.</li>
    --- End diff --
    
    Nit: This seems like an enum would be a good fit.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634302
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubReceiverImpl.java ---
    @@ -0,0 +1,155 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +import com.microsoft.azure.eventhubs.EventHubException;
    +import org.apache.storm.metric.api.CountMetric;
    +import org.apache.storm.metric.api.MeanReducer;
    +import org.apache.storm.metric.api.ReducedMetric;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.collect.Iterables;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +
    +/**
    + * {@link PartitionReceiver} based implementation to receives messages from a
    + * given Eventhub partition
    + *
    + */
    +public class EventHubReceiverImpl implements IEventHubReceiver {
    +	private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
    +
    +	private final EventHubConfig eventHubConfig;
    +	private final String partitionId;
    +
    +	private PartitionReceiver receiver;
    +	private EventHubClient ehClient;
    +	private ExecutorService executorService;
    +
    +	private ReducedMetric receiveApiLatencyMean;
    +	private CountMetric receiveApiCallCount;
    +	private CountMetric receiveMessageCount;
    +
    +	/**
    +	 * Creates a new instance based on provided configuration. The connection, and
    +	 * consumer group settings are read from the passed in EventHubConfig instance.
    +	 * 
    +	 * @param config
    +	 *            Connection, consumer group settings
    +	 * @param partitionId
    +	 *            target partition id to connect to and read from
    +	 */
    +	public EventHubReceiverImpl(EventHubConfig config, String partitionId) {
    +		this.partitionId = partitionId;
    +		this.eventHubConfig = config;
    +
    +		receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
    +		receiveApiCallCount = new CountMetric();
    +		receiveMessageCount = new CountMetric();
    +	}
    +
    +	@Override
    +	public void open(IEventFilter filter) throws IOException, EventHubException {
    +		long start = System.currentTimeMillis();
    +		logger.debug(String.format("Creating EventHub Client: partitionId: %s, filter value:%s, prefetchCount: %s",
    +				partitionId, filter.toString(), String.valueOf(eventHubConfig.getPrefetchCount())));
    +		executorService = Executors.newSingleThreadExecutor();
    +		ehClient = EventHubClient.createSync(eventHubConfig.getConnectionString(), executorService);
    +		receiver = PartitionReceiverFactory.createReceiver(ehClient, filter, eventHubConfig, partitionId);
    +		receiver.setPrefetchCount(eventHubConfig.getPrefetchCount());
    +		logger.debug("created eventhub receiver, time taken(ms): " + (System.currentTimeMillis() - start));
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (receiver == null)
    +			return;
    +
    +		try {
    +			receiver.close().whenCompleteAsync((voidargs, error) -> {
    +				try {
    +					if (error != null) {
    +						logger.error("Exception during receiver close phase: " + error.toString());
    +					}
    +					ehClient.closeSync();
    +				} catch (Exception e) {
    +					logger.error("Exception during ehclient close phase: " + e.toString());
    +				}
    +			}).get();
    +		} catch (InterruptedException | ExecutionException e) {
    +			logger.warn("Exception occured during close phase: " + e.toString());
    +		}
    +
    +		executorService.shutdown();
    +
    +		logger.info("closed eventhub receiver: partitionId=" + partitionId);
    +		ehClient = null;
    +		receiver = null;
    +		executorService = null;
    +	}
    +
    +	@Override
    +	public boolean isOpen() {
    +		return (receiver != null);
    +	}
    +
    +	@Override
    +	public Iterable<EventData> receive() {
    +		return receive(eventHubConfig.getReceiveEventsMaxCount());
    +	}
    +
    +	@Override
    +	public Iterable<EventData> receive(int batchSize) {
    +		long start = System.currentTimeMillis();
    +		Iterable<EventData> receivedEvents = null;
    +
    +		try {
    +			receivedEvents = receiver.receiveSync(batchSize);
    +			if (receivedEvents != null) {
    +				logger.debug("Batchsize: " + batchSize + ", Received event count: " + Iterables.size(receivedEvents));
    +			}
    +		} catch (EventHubException e) {
    +			logger.error("Exception occured during receive" + e.toString());
    +			return null;
    +		}
    +		long end = System.currentTimeMillis();
    +		long millis = (end - start);
    +		receiveApiLatencyMean.update(millis);
    +		receiveApiCallCount.incr();
    --- End diff --
    
    I'm not sure, but should this not be updated if you hit the exception above as well?


---

[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2588
  
    @SreeramGarlapati Quick google suggests that you can try adding -Xrenormalize to the merge command. https://stackoverflow.com/a/12194759/8845188


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634219
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubReceiverImpl.java ---
    @@ -0,0 +1,155 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +import com.microsoft.azure.eventhubs.EventHubException;
    +import org.apache.storm.metric.api.CountMetric;
    +import org.apache.storm.metric.api.MeanReducer;
    +import org.apache.storm.metric.api.ReducedMetric;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.collect.Iterables;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +
    +/**
    + * {@link PartitionReceiver} based implementation to receives messages from a
    + * given Eventhub partition
    + *
    + */
    +public class EventHubReceiverImpl implements IEventHubReceiver {
    +	private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
    +
    +	private final EventHubConfig eventHubConfig;
    +	private final String partitionId;
    +
    +	private PartitionReceiver receiver;
    +	private EventHubClient ehClient;
    +	private ExecutorService executorService;
    +
    +	private ReducedMetric receiveApiLatencyMean;
    +	private CountMetric receiveApiCallCount;
    +	private CountMetric receiveMessageCount;
    +
    +	/**
    +	 * Creates a new instance based on provided configuration. The connection, and
    +	 * consumer group settings are read from the passed in EventHubConfig instance.
    +	 * 
    +	 * @param config
    +	 *            Connection, consumer group settings
    +	 * @param partitionId
    +	 *            target partition id to connect to and read from
    +	 */
    +	public EventHubReceiverImpl(EventHubConfig config, String partitionId) {
    +		this.partitionId = partitionId;
    +		this.eventHubConfig = config;
    +
    +		receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
    +		receiveApiCallCount = new CountMetric();
    +		receiveMessageCount = new CountMetric();
    +	}
    +
    +	@Override
    +	public void open(IEventFilter filter) throws IOException, EventHubException {
    +		long start = System.currentTimeMillis();
    +		logger.debug(String.format("Creating EventHub Client: partitionId: %s, filter value:%s, prefetchCount: %s",
    +				partitionId, filter.toString(), String.valueOf(eventHubConfig.getPrefetchCount())));
    +		executorService = Executors.newSingleThreadExecutor();
    +		ehClient = EventHubClient.createSync(eventHubConfig.getConnectionString(), executorService);
    +		receiver = PartitionReceiverFactory.createReceiver(ehClient, filter, eventHubConfig, partitionId);
    +		receiver.setPrefetchCount(eventHubConfig.getPrefetchCount());
    +		logger.debug("created eventhub receiver, time taken(ms): " + (System.currentTimeMillis() - start));
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (receiver == null)
    +			return;
    +
    +		try {
    +			receiver.close().whenCompleteAsync((voidargs, error) -> {
    +				try {
    +					if (error != null) {
    +						logger.error("Exception during receiver close phase: " + error.toString());
    +					}
    +					ehClient.closeSync();
    --- End diff --
    
    Why is the order of shutdown for this different from the bolt? There, the client is shut down before the sender?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636020
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---
    @@ -17,246 +17,281 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    +    private IPartitionCoordinator partitionCoordinator;
    +    private IPartitionManagerFactory pmFactory;
    +    private IEventHubReceiverFactory recvFactory;
    +    private SpoutOutputCollector collector;
    +    private long lastCheckpointTime;
    +    private int currentPartitionIndex = -1;
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
         }
    -    this.recvFactory = recvFactory;
    -    if(this.recvFactory == null) {
    -      this.recvFactory = new IEventHubReceiverFactory() {
    -        @Override
    -        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
    -            String partitionId) {
    -          return new EventHubReceiverImpl(spoutConfig, partitionId);
    -        }
    -      };
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount,
    +            final int batchSize) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount, batchSize));
         }
    -    
    -  }
    -  
    -  /**
    -   * This is a extracted method that is easy to test
    -   * @param config
    -   * @param totalTasks
    -   * @param taskIndex
    -   * @param collector
    -   * @throws Exception
    -   */
    -  public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
    -    this.collector = collector;
    -    if(stateStore == null) {
    -      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    -      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
    -        //use storm's zookeeper servers if not specified.
    -        @SuppressWarnings("unchecked")
    -        List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    -        Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    -        StringBuilder sb = new StringBuilder();
    -        for (String zk : zkServers) {
    -          if (sb.length() > 0) {
    -            sb.append(',');
    -          }
    -          sb.append(zk+":"+zkPort);
    +
    +    public EventHubSpout(final EventHubSpoutConfig spoutConfig) {
    +        this(spoutConfig, null, null, null);
    +    }
    +
    +    public EventHubSpout(
    +            final EventHubSpoutConfig spoutConfig,
    +            final IStateStore store,
    +            final IPartitionManagerFactory pmFactory,
    +            final IEventHubReceiverFactory recvFactory) {
    +        this.eventHubConfig = spoutConfig;
    +        this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    +        this.lastCheckpointTime = System.currentTimeMillis();
    +        stateStore = store;
    +        this.pmFactory = pmFactory;
    +        if (this.pmFactory == null) {
    +            this.pmFactory = new IPartitionManagerFactory() {
    +                private static final long serialVersionUID = -3134660797825594845L;
    --- End diff --
    
    Quick google suggests that serializing anonymous classes is a bad idea. Could you put this in another class instead?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631909
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    +    /**
    +     * Constructs an instance with specified connection string, and eventhub name
    +     * The @link {@link #partitionMode} is set to false.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     */
    +    public EventHubBoltConfig(final String connectionString, final String entityPath) {
    +        this(connectionString, entityPath, false, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, eventhub name and
    +     * partition mode.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     * @param partitionMode    partitionMode to apply
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode) {
    +        this(connectionString, entityPath, partitionMode, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified credentials, eventhub name and
    +     * partition mode.
    +     * <p>
    +     * <p>
    +     * For soverign clouds please use the constructor
    +     * {@link EventHubBolt#EventHubBolt(String, String)}.
    +     * </p>
    +     *
    +     * @param userName      user name to connect as
    +     * @param password      password for the user name
    +     * @param namespace     servicebus namespace
    +     * @param entityPath    EntityHub name
    +     * @param partitionMode Dictates write mode. if true will write to specific partitions
    +     */
    +    public EventHubBoltConfig(String userName, String password, String namespace, String entityPath,
    +                              boolean partitionMode) {
    +        this(userName, password, namespace, FieldConstants.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, and partition mode.
    +     * The specified {@link IEventDataFormat} will be used to format data to bytes
    +     * before
    --- End diff --
    
    This seems incomplete


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636774
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/state/ZookeeperStateStore.java ---
    @@ -0,0 +1,111 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.state;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryNTimes;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.microsoft.azure.eventhubs.impl.StringUtil;
    +
    +/**
    + * Zookeeper based implementation of the state store.
    + *
    + * @see IStateStore
    + */
    +public class ZookeeperStateStore implements IStateStore {
    +    private static final long serialVersionUID = -995647135239199102L;
    +    private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class);
    +    private static final int DEFAULT_RETRIES = 3;
    +    private static final int DEFAULT_RETRY_INTERVAL_MS = 100;
    +    private static final String ZK_LOCAL_URL = "localhost:2181";
    +
    +    private final String zookeeperConnectionString;
    +    private final CuratorFramework curatorFramework;
    +
    +    /**
    +     * Creates a new instance. No connection to Zookeeper is established yet.
    +     *
    +     * @param zookeeperConnectionString Zookeeper connection string
    +     */
    +    public ZookeeperStateStore(String zookeeperConnectionString) {
    +        this(zookeeperConnectionString, DEFAULT_RETRIES, DEFAULT_RETRY_INTERVAL_MS);
    +    }
    +
    +    /**
    +     * Creates a new instance. No connection to Zookeeper is established yet.
    +     *
    +     * @param connectionString Zookeeper connection string (example: zk1.azurehdinsight.net:2181)
    +     * @param retries          number of times to retry for transient failures
    +     * @param retryInterval    Sleep interval (in ms) between retry attempts
    +     */
    +    public ZookeeperStateStore(String connectionString, int retries, int retryInterval) {
    +        zookeeperConnectionString = StringUtil.isNullOrWhiteSpace(connectionString) ? ZK_LOCAL_URL : connectionString;
    +        logger.debug("using ZKConnectionString: " + zookeeperConnectionString);
    +
    +        curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString,
    +                new RetryNTimes(retries, retryInterval));
    +    }
    +
    +    @Override
    +    public void open() {
    +        curatorFramework.start();
    +    }
    +
    +    @Override
    +    public void close() {
    +        curatorFramework.close();
    +    }
    +
    +    @Override
    +    public void saveData(String statePath, String data) {
    +        data = StringUtil.isNullOrWhiteSpace(data) ? "" : data;
    +        byte[] bytes = data.getBytes();
    --- End diff --
    
    You should specify a charset


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635852
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---
    @@ -17,246 +17,281 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    --- End diff --
    
    Nit: Consider declaring the fields that are not serialized "transient" for readability


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635422
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/PartitionManager.java ---
    @@ -0,0 +1,132 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.esotericsoftware.minlog.Log;
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +public class PartitionManager extends SimplePartitionManager {
    +    private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class);
    +
    +    // all sent events are stored in pending
    +    private final Map<String, EventHubMessage> pending;
    +
    +    // all failed events are put in toResend, which is sorted by event's offset
    +    private final TreeSet<EventHubMessage> toResend;
    +
    +    private final TreeSet<EventHubMessage> waitingToEmit;
    +
    +    public PartitionManager(EventHubConfig ehConfig, String partitionId, IStateStore stateStore,
    +                            IEventHubReceiver receiver) {
    +        super(ehConfig, partitionId, stateStore, receiver);
    +
    +        this.pending = new LinkedHashMap<String, EventHubMessage>();
    +        this.toResend = new TreeSet<EventHubMessage>();
    +        this.waitingToEmit = new TreeSet<EventHubMessage>();
    +    }
    +
    +    private void fill() {
    +        Iterable<EventData> receivedEvents = receiver.receive(config.getReceiveEventsMaxCount());
    +        if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
    +            logger.debug("No messages received from EventHub.");
    +            return;
    +        }
    +
    +        String startOffset = null;
    +        String endOffset = null;
    +        for (EventData ed : receivedEvents) {
    +            EventHubMessage ehm = new EventHubMessage(ed, partitionId);
    +            startOffset = (startOffset == null) ? ehm.getOffset() : startOffset;
    +            endOffset = ehm.getOffset();
    +            waitingToEmit.add(ehm);
    +        }
    +
    +        logger.debug("Received Messages Start Offset: " + startOffset + ", End Offset: " + endOffset);
    +    }
    +
    +    @Override
    +    public EventHubMessage receive() {
    +        logger.debug("Retrieving messages for partition: " + partitionId);
    +        int countToRetrieve = pending.size() - config.getMaxPendingMsgsPerPartition();
    --- End diff --
    
    The name on this is confusing. It's the number of messages that pending is over the limit, not the number of messages to retrieve.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634451
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/IEventHubReceiver.java ---
    @@ -0,0 +1,78 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubException;
    +
    +/**
    + * EventHub based Receiver contracts
    + *
    + */
    +public interface IEventHubReceiver {
    +
    +	/**
    +	 * Open / Establish connection to Eventhub given filters. The partition to
    +	 * receive events from will be specified in an implementation specific way.
    +	 * 
    +	 * @param filter
    +	 *            offset or timestamp based filter
    +	 * @throws EventHubException
    +	 * @see {@link IEventFilter} {@link OffsetFilter} {@link TimestampFilter}
    +	 */
    +	void open(IEventFilter filter) throws IOException, EventHubException ;
    +
    +	/**
    +	 * Cleanup and close connection to Eventhub
    +	 */
    +	void close();
    +
    +	/**
    +	 * Check if connection to eventhub is active
    +	 * 
    +	 * @return
    +	 */
    +	boolean isOpen();
    +
    +	/**
    +	 * Receive 'one' event from EventHub for processing from a target partition
    --- End diff --
    
    This doesn't seem right. The implementation receives a number of events based on the receive event max count.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635910
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---
    @@ -17,246 +17,281 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    +    private IPartitionCoordinator partitionCoordinator;
    +    private IPartitionManagerFactory pmFactory;
    +    private IEventHubReceiverFactory recvFactory;
    +    private SpoutOutputCollector collector;
    +    private long lastCheckpointTime;
    +    private int currentPartitionIndex = -1;
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
         }
    -    this.recvFactory = recvFactory;
    -    if(this.recvFactory == null) {
    -      this.recvFactory = new IEventHubReceiverFactory() {
    -        @Override
    -        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
    -            String partitionId) {
    -          return new EventHubReceiverImpl(spoutConfig, partitionId);
    -        }
    -      };
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount,
    +            final int batchSize) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount, batchSize));
         }
    -    
    -  }
    -  
    -  /**
    -   * This is a extracted method that is easy to test
    -   * @param config
    -   * @param totalTasks
    -   * @param taskIndex
    -   * @param collector
    -   * @throws Exception
    -   */
    -  public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
    -    this.collector = collector;
    -    if(stateStore == null) {
    -      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    -      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
    -        //use storm's zookeeper servers if not specified.
    -        @SuppressWarnings("unchecked")
    -        List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    -        Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    -        StringBuilder sb = new StringBuilder();
    -        for (String zk : zkServers) {
    -          if (sb.length() > 0) {
    -            sb.append(',');
    -          }
    -          sb.append(zk+":"+zkPort);
    +
    +    public EventHubSpout(final EventHubSpoutConfig spoutConfig) {
    +        this(spoutConfig, null, null, null);
    +    }
    +
    +    public EventHubSpout(
    +            final EventHubSpoutConfig spoutConfig,
    +            final IStateStore store,
    +            final IPartitionManagerFactory pmFactory,
    +            final IEventHubReceiverFactory recvFactory) {
    +        this.eventHubConfig = spoutConfig;
    +        this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    +        this.lastCheckpointTime = System.currentTimeMillis();
    --- End diff --
    
    Nit: You will have an easier time testing this if you use Time.currentTimeMillis (Storm's time simulation) instead.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635720
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/EventHubMessageDataScheme.java ---
    @@ -0,0 +1,54 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.format;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.tuple.Fields;
    +
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +/**
    + * This scheme constructs an {@link EventHubMessage} object from the received
    --- End diff --
    
    The class doesn't construct an EventHubMessage though?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635150
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/Partitions.java ---
    @@ -25,17 +25,18 @@
      * Represents all EventHub partitions a spout is receiving messages from.
      */
     public class Partitions implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  private List<Partition> partitionList;
    -  public Partitions() {
    -    partitionList = new ArrayList<Partition>();
    -  }
    -  
    -  public void addPartition(Partition partition) {
    -    partitionList.add(partition);
    -  }
    -  
    -  public List<Partition> getPartitions() {
    -    return partitionList;
    -  }
    +    private static final long serialVersionUID = 1L;
    +    private List<Partition> partitionList;
    --- End diff --
    
    Why does it make sense to have this class, rather than just using the `List<Partition>` directly?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173633130
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubConfig.java ---
    @@ -0,0 +1,342 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.Serializable;
    +
    +import org.apache.storm.eventhubs.format.EventHubMessageDataScheme;
    +import org.apache.storm.eventhubs.format.IEventDataScheme;
    +import org.apache.storm.eventhubs.format.StringEventDataScheme;
    +
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    +
    +/**
    + * Captures connection details for EventHub
    + */
    +public class EventHubConfig implements Serializable {
    +    private static final long serialVersionUID = -2913928074769667240L;
    +    protected String userName;
    +    protected String password;
    +    protected String namespace;
    +    protected String entityPath;
    +    protected int partitionCount;
    +    protected String zkConnectionString = null;
    +    protected int checkpointIntervalInSeconds = 10;
    +    protected int receiverCredits = 1024;
    +    protected int maxPendingMsgsPerPartition = FieldConstants.DEFAULT_MAX_PENDING_PER_PARTITION;
    +    protected int receiveEventsMaxCount = FieldConstants.DEFAULT_RECEIVE_MAX_CAP;
    +    protected int prefetchCount = FieldConstants.DEFAULT_PREFETCH_COUNT;
    +    protected long enqueueTimeFilter = 0;
    +    protected String connectionString;
    +    protected String topologyName;
    +    protected IEventDataScheme eventDataScheme = new StringEventDataScheme();
    +    protected String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
    +
    +    public EventHubConfig(String namespace, String entityPath, String userName, String password, int partitionCount) {
    +        this.namespace = namespace;
    +        this.entityPath = entityPath;
    +        this.userName = userName;
    +        this.password = password;
    +        this.partitionCount = partitionCount;
    +        this.connectionString = new ConnectionStringBuilder()
    +                .setNamespaceName(namespace)
    +                .setEventHubName(entityPath)
    +                .setSasKeyName(userName)
    +                .setSasKey(password)
    +                .toString();
    +    }
    +
    +    /**
    +     * Returns username used in credentials provided to EventHub
    +     *
    +     * @return username
    +     */
    +    public String getUserName() {
    +        return userName;
    +    }
    +
    +    /**
    +     * Returns password used in credentials provided to EventHub
    +     *
    +     * @return password
    +     */
    +    public String getPassword() {
    +        return password;
    +    }
    +
    +    /**
    +     * Returns servicebus namespace used when connecting to EventHub
    +     *
    +     * @return servicebus namespace
    +     */
    +    public String getNamespace() {
    +        return namespace;
    +    }
    +
    +    /**
    +     * Returns name of the EventHub
    +     *
    +     * @return EventHub name
    +     */
    +    public String getEntityPath() {
    +        return entityPath;
    +    }
    +
    +    /**
    +     * Returns specified partition count on the EventHub
    +     *
    +     * @return partition count
    +     */
    +    public int getPartitionCount() {
    +        return partitionCount;
    +    }
    +
    +    /**
    +     * Sets the zookeeper connection string. (Example:
    +     * zk1-clusterfqdn:2181,zk2-clusterfqdn:2181)
    +     *
    +     * @param zkConnectionString Zookeeper connection string
    +     */
    +    public void setZkConnectionString(String zkConnectionString) {
    +        this.zkConnectionString = zkConnectionString;
    +    }
    +
    +    /**
    +     * Returns the configured zookeeper connection string.
    +     */
    +    public String getZkConnectionString() {
    +        return zkConnectionString;
    +    }
    +
    +    /**
    +     * Returns the specified frequency interval at which checkpoint information is
    +     * persisted.
    +     *
    +     * @return checkpoint interval
    +     */
    +    public int getCheckpointIntervalInSeconds() {
    +        return checkpointIntervalInSeconds;
    +    }
    +
    +    /**
    +     * Sets the frequency with which checkpoint information is persisted to
    +     * zookeeper
    +     *
    +     * @param checkpointIntervalInSeconds
    +     */
    +    public void setCheckpointIntervalInSeconds(int checkpointIntervalInSeconds) {
    +        this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
    +    }
    +
    +    /**
    +     * Returns configured receivercredits used when connecting to EventHub Note:
    +     * <p>
    +     * This is a legacy setting that will soon be deprecated. Please use the
    +     * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
    +     * </p>
    +     *
    +     * @return
    +     * @deprecated
    +     */
    +    public int getReceiverCredits() {
    +        return receiverCredits;
    +    }
    +
    +    /**
    +     * Configures receivercredits used when connecting to EventHub
    +     * <p>
    +     * Note: This is a legacy setting that will soon be deprecated. Please use the
    +     * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
    +     * </p>
    +     *
    +     * @deprecated
    +     */
    +    public void setReceiverCredits(int receiverCredits) {
    +        this.receiverCredits = receiverCredits;
    +    }
    +
    +    /**
    +     * Returns the configured the size of the pending queue for each partition.
    +     * While the pending queue is at full capacity no new receive calls will be made
    +     * to EventHub. The default value for it is
    +     * {@link FieldConstants#DEFAULT_MAX_PENDING_PER_PARTITION}
    +     *
    +     * @return
    +     */
    +    public int getMaxPendingMsgsPerPartition() {
    +        return maxPendingMsgsPerPartition;
    +    }
    +
    +    /**
    +     * configured the size of the pending queue for each partition. While the
    +     * pending queue is at full capacity no new receive calls will be made to
    +     * EventHub. The default value for it is
    +     * {@link FieldConstants#DEFAULT_MAX_PENDING_PER_PARTITION}
    +     *
    +     * @param maxPendingMsgsPerPartition
    +     */
    +    public void setMaxPendingMsgsPerPartition(int maxPendingMsgsPerPartition) {
    +        this.maxPendingMsgsPerPartition = maxPendingMsgsPerPartition;
    +    }
    +
    +    /**
    +     * Returns the configured upper limit on number of events that can be received
    +     * from EventHub per call. Default is
    +     * {@link FieldConstants#DEFAULT_RECEIVE_MAX_CAP}
    +     *
    +     * @return
    +     */
    +    public int getReceiveEventsMaxCount() {
    +        return receiveEventsMaxCount;
    +    }
    +
    +    /**
    +     * Configures the upper limit on number of events that can be received from
    +     * EventHub per call. Default is {@link FieldConstants#DEFAULT_RECEIVE_MAX_CAP}
    +     * <p>
    +     * Setting this to a value greater than one will reduce the number of calls that
    +     * are made to EventHub. The received events are buffered in an internal cache
    +     * and fed to the spout during the nextTuple call.
    +     * </p>
    +     *
    +     * @param receiveEventsMaxCount
    +     * @return
    +     */
    +    public void setReceiveEventsMaxCount(int receiveEventsMaxCount) {
    +        this.receiveEventsMaxCount = receiveEventsMaxCount;
    +    }
    +
    +    /**
    +     * Returns the configured value for the TimeBased filter for when to start
    +     * receiving events from.
    +     *
    +     * @return
    +     */
    +    public long getEnqueueTimeFilter() {
    +        return enqueueTimeFilter;
    +    }
    +
    +    /**
    +     * Configures value for the TimeBased filter for when to start receiving events
    +     * from.
    +     *
    +     * @param enqueueTimeFilter
    +     */
    +    public void setEnqueueTimeFilter(long enqueueTimeFilter) {
    +        this.enqueueTimeFilter = enqueueTimeFilter;
    +    }
    +
    +    /**
    +     * Returns the connection string used when talking to EventHub
    +     *
    +     * @return
    +     */
    +    public String getConnectionString() {
    +        return connectionString;
    +    }
    +
    +    /**
    +     * Configures the connection string to be used when talking to EventHub
    +     *
    +     * @param connectionString
    +     */
    +    public void setConnectionString(String connectionString) {
    +        this.connectionString = connectionString;
    +    }
    +
    +    /**
    +     * Name of the toppology
    +     *
    +     * @return
    +     */
    +    public String getTopologyName() {
    +        return topologyName;
    +    }
    +
    +    /**
    +     * Name of the topology
    +     *
    +     * @param topologyName
    +     */
    +    public void setTopologyName(String topologyName) {
    --- End diff --
    
    I'm pretty sure this property is unnecessary. If you need the topology name, you can get it from the topology config passed to `open`. It should be under this key http://storm.apache.org/releases/1.1.2/javadocs/org/apache/storm/Config.html#TOPOLOGY_NAME


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173633175
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubConfig.java ---
    @@ -0,0 +1,342 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.Serializable;
    +
    +import org.apache.storm.eventhubs.format.EventHubMessageDataScheme;
    +import org.apache.storm.eventhubs.format.IEventDataScheme;
    +import org.apache.storm.eventhubs.format.StringEventDataScheme;
    +
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    +
    +/**
    + * Captures connection details for EventHub
    + */
    +public class EventHubConfig implements Serializable {
    +    private static final long serialVersionUID = -2913928074769667240L;
    +    protected String userName;
    +    protected String password;
    +    protected String namespace;
    +    protected String entityPath;
    +    protected int partitionCount;
    +    protected String zkConnectionString = null;
    +    protected int checkpointIntervalInSeconds = 10;
    +    protected int receiverCredits = 1024;
    +    protected int maxPendingMsgsPerPartition = FieldConstants.DEFAULT_MAX_PENDING_PER_PARTITION;
    +    protected int receiveEventsMaxCount = FieldConstants.DEFAULT_RECEIVE_MAX_CAP;
    +    protected int prefetchCount = FieldConstants.DEFAULT_PREFETCH_COUNT;
    +    protected long enqueueTimeFilter = 0;
    +    protected String connectionString;
    +    protected String topologyName;
    +    protected IEventDataScheme eventDataScheme = new StringEventDataScheme();
    +    protected String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
    +
    +    public EventHubConfig(String namespace, String entityPath, String userName, String password, int partitionCount) {
    +        this.namespace = namespace;
    +        this.entityPath = entityPath;
    +        this.userName = userName;
    +        this.password = password;
    +        this.partitionCount = partitionCount;
    +        this.connectionString = new ConnectionStringBuilder()
    +                .setNamespaceName(namespace)
    +                .setEventHubName(entityPath)
    +                .setSasKeyName(userName)
    +                .setSasKey(password)
    +                .toString();
    +    }
    +
    +    /**
    +     * Returns username used in credentials provided to EventHub
    +     *
    +     * @return username
    +     */
    +    public String getUserName() {
    +        return userName;
    +    }
    +
    +    /**
    +     * Returns password used in credentials provided to EventHub
    +     *
    +     * @return password
    +     */
    +    public String getPassword() {
    +        return password;
    +    }
    +
    +    /**
    +     * Returns servicebus namespace used when connecting to EventHub
    +     *
    +     * @return servicebus namespace
    +     */
    +    public String getNamespace() {
    +        return namespace;
    +    }
    +
    +    /**
    +     * Returns name of the EventHub
    +     *
    +     * @return EventHub name
    +     */
    +    public String getEntityPath() {
    +        return entityPath;
    +    }
    +
    +    /**
    +     * Returns specified partition count on the EventHub
    +     *
    +     * @return partition count
    +     */
    +    public int getPartitionCount() {
    +        return partitionCount;
    +    }
    +
    +    /**
    +     * Sets the zookeeper connection string. (Example:
    +     * zk1-clusterfqdn:2181,zk2-clusterfqdn:2181)
    +     *
    +     * @param zkConnectionString Zookeeper connection string
    +     */
    +    public void setZkConnectionString(String zkConnectionString) {
    +        this.zkConnectionString = zkConnectionString;
    +    }
    +
    +    /**
    +     * Returns the configured zookeeper connection string.
    +     */
    +    public String getZkConnectionString() {
    +        return zkConnectionString;
    +    }
    +
    +    /**
    +     * Returns the specified frequency interval at which checkpoint information is
    +     * persisted.
    +     *
    +     * @return checkpoint interval
    +     */
    +    public int getCheckpointIntervalInSeconds() {
    +        return checkpointIntervalInSeconds;
    +    }
    +
    +    /**
    +     * Sets the frequency with which checkpoint information is persisted to
    +     * zookeeper
    +     *
    +     * @param checkpointIntervalInSeconds
    +     */
    +    public void setCheckpointIntervalInSeconds(int checkpointIntervalInSeconds) {
    +        this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
    +    }
    +
    +    /**
    +     * Returns configured receivercredits used when connecting to EventHub Note:
    +     * <p>
    +     * This is a legacy setting that will soon be deprecated. Please use the
    +     * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
    +     * </p>
    +     *
    +     * @return
    +     * @deprecated
    --- End diff --
    
    This class looks new, so why even have this method?


---

[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2588
  
    By the way, when you get a chance, please go to https://issues.apache.org/jira and create an issue for tracking these changes, then rename the PR and commit(s) so they contain the issue number. It makes it easier for us to track which branches the changes are applied to, and helps us generate correct release notes. Thanks.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636199
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---
    @@ -17,246 +17,281 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    +    private IPartitionCoordinator partitionCoordinator;
    +    private IPartitionManagerFactory pmFactory;
    +    private IEventHubReceiverFactory recvFactory;
    +    private SpoutOutputCollector collector;
    +    private long lastCheckpointTime;
    +    private int currentPartitionIndex = -1;
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount));
         }
    -    this.recvFactory = recvFactory;
    -    if(this.recvFactory == null) {
    -      this.recvFactory = new IEventHubReceiverFactory() {
    -        @Override
    -        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
    -            String partitionId) {
    -          return new EventHubReceiverImpl(spoutConfig, partitionId);
    -        }
    -      };
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount,
    +            final int batchSize) {
    +        this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount, batchSize));
         }
    -    
    -  }
    -  
    -  /**
    -   * This is a extracted method that is easy to test
    -   * @param config
    -   * @param totalTasks
    -   * @param taskIndex
    -   * @param collector
    -   * @throws Exception
    -   */
    -  public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
    -    this.collector = collector;
    -    if(stateStore == null) {
    -      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    -      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
    -        //use storm's zookeeper servers if not specified.
    -        @SuppressWarnings("unchecked")
    -        List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    -        Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    -        StringBuilder sb = new StringBuilder();
    -        for (String zk : zkServers) {
    -          if (sb.length() > 0) {
    -            sb.append(',');
    -          }
    -          sb.append(zk+":"+zkPort);
    +
    +    public EventHubSpout(final EventHubSpoutConfig spoutConfig) {
    +        this(spoutConfig, null, null, null);
    +    }
    +
    +    public EventHubSpout(
    +            final EventHubSpoutConfig spoutConfig,
    +            final IStateStore store,
    +            final IPartitionManagerFactory pmFactory,
    +            final IEventHubReceiverFactory recvFactory) {
    +        this.eventHubConfig = spoutConfig;
    +        this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds();
    +        this.lastCheckpointTime = System.currentTimeMillis();
    +        stateStore = store;
    +        this.pmFactory = pmFactory;
    +        if (this.pmFactory == null) {
    +            this.pmFactory = new IPartitionManagerFactory() {
    +                private static final long serialVersionUID = -3134660797825594845L;
    +
    +                @Override
    +                public IPartitionManager create(EventHubConfig ehConfig, String partitionId, IStateStore stateStore,
    +                                                IEventHubReceiver receiver) {
    +                    return new PartitionManager(spoutConfig, partitionId, stateStore, receiver);
    +                }
    +            };
    +        }
    +        this.recvFactory = recvFactory;
    +        if (this.recvFactory == null) {
    +            this.recvFactory = new IEventHubReceiverFactory() {
    +
    +                private static final long serialVersionUID = 7215384402396274196L;
    +
    +                @Override
    +                public IEventHubReceiver create(EventHubConfig spoutConfig, String partitionId) {
    +                    return new EventHubReceiverImpl(spoutConfig, partitionId);
    +                }
    +
    +            };
             }
    -        zkEndpointAddress = sb.toString();
    -      }
    -      stateStore = new ZookeeperStateStore(zkEndpointAddress,
    -          Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    -          Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
         }
    -    stateStore.open();
     
    -    partitionCoordinator = new StaticPartitionCoordinator(
    -        eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory);
    +    /**
    +     * This is a extracted method that is easy to test
    +     *
    +     * @param config
    +     * @param totalTasks
    +     * @param taskIndex
    +     * @param collector
    +     * @throws Exception
    +     */
    +    public void preparePartitions(
    +            final Map config,
    +            final int totalTasks,
    +            final int taskIndex,
    +            final SpoutOutputCollector collector) throws Exception {
    +        this.collector = collector;
    +        if (stateStore == null) {
    +            String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    +            if (StringUtils.isBlank(zkEndpointAddress)) {
    +                @SuppressWarnings("unchecked")
    +                List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS);
    +                Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    +                zkEndpointAddress = String.join(",",
    +                        zkServers.stream().map(x -> x + ":" + zkPort).collect(Collectors.toList()));
    +            }
     
    -    for (IPartitionManager partitionManager : 
    -      partitionCoordinator.getMyPartitionManagers()) {
    -      partitionManager.open();
    +            stateStore = new ZookeeperStateStore(zkEndpointAddress,
    +                    Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    --- End diff --
    
    Pretty sure these properties are Numbers. Consider casting to Number and using intValue.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631260
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---
    @@ -31,116 +30,131 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    -
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionSender;
    +import com.microsoft.azure.eventhubs.EventHubException;
     
     /**
      * A bolt that writes event message to EventHub.
    + * <p>
    + * <p>
    + * The implementation has two modes of operation:
    + * <ul>
    + * <li>partitionmode = true, One bolt for per partition write.</li>
    + * <li>partitionmode = false, use default partitioning key strategy to write to
    + * partition(s)</li>
    + * </ul>
    + * </p>
      */
     public class EventHubBolt extends BaseRichBolt {
    -	private static final long serialVersionUID = 1L;
    -	private static final Logger logger = LoggerFactory
    -			.getLogger(EventHubBolt.class);
    -
    -	protected OutputCollector collector;
    -	protected PartitionSender sender;
    -	protected EventHubClient ehClient;
    -	protected EventHubBoltConfig boltConfig;
    -
    -	public EventHubBolt(String connectionString, String entityPath) {
    -		boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    -	}
    -
    -	public EventHubBolt(String userName, String password, String namespace,
    -			String entityPath, boolean partitionMode) {
    -		boltConfig = new EventHubBoltConfig(userName, password, namespace,
    -				entityPath, partitionMode);
    -	}
    -
    -	public EventHubBolt(EventHubBoltConfig config) {
    -		boltConfig = config;
    -	}
    -
    -	@Override
    -	public void prepare(Map<String, Object> config, TopologyContext context,
    -			OutputCollector collector) {
    -		this.collector = collector;
    -		String myPartitionId = null;
    -		if (boltConfig.getPartitionMode()) {
    -			// We can use the task index (starting from 0) as the partition ID
    -			myPartitionId = "" + context.getThisTaskIndex();
    -		}
    -		logger.info("creating sender: " + boltConfig.getConnectionString()
    -				+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
    -		try {
    -			ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
    -			if (boltConfig.getPartitionMode()) {
    -				sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
    -			}
    -		} catch (Exception ex) {
    -			collector.reportError(ex);
    -			throw new RuntimeException(ex);
    -		}
    -
    -	}
    -
    -	@Override
    -	public void execute(Tuple tuple) {
    -		try {
    -			EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
    -			if (boltConfig.getPartitionMode() && sender!=null) {
    -				sender.sendSync(sendEvent);
    -			}
    -			else if (boltConfig.getPartitionMode() && sender==null) {
    -				throw new EventHubException("Sender is null");
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient!=null) {
    -				ehClient.sendSync(sendEvent);
    -			}
    -			else if (!boltConfig.getPartitionMode() && ehClient==null) {
    -				throw new EventHubException("ehclient is null");
    -			}
    -			collector.ack(tuple);
    -		} catch (EventHubException ex ) {
    -			collector.reportError(ex);
    -			collector.fail(tuple);
    -		} catch (ServiceBusException e) {
    -			collector.reportError(e);
    -			collector.fail(tuple);
    -		}
    -	}
    -
    -	@Override
    -	public void cleanup() {
    -		if(sender != null) {
    -			try {
    -				sender.close().whenComplete((voidargs,error)->{
    -					try{
    -						if(error!=null){
    -							logger.error("Exception during sender cleanup phase"+error.toString());
    -						}
    -						ehClient.closeSync();
    -					}catch (Exception e){
    -						logger.error("Exception during ehclient cleanup phase"+e.toString());
    -					}
    -				}).get();
    -			} catch (InterruptedException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			} catch (ExecutionException e) {
    -				logger.error("Exception occured during cleanup phase"+e.toString());
    -			}
    -			logger.info("Eventhub Bolt cleaned up");
    -			sender = null;
    -			ehClient =  null;
    -		}
    -	}
    -
    -	@Override
    -	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -
    -	}
    -
    +    private static final long serialVersionUID = 1L;
    +    private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);
    +
    +    private ExecutorService executorService;
    +    protected OutputCollector collector;
    +    protected EventHubClient ehClient;
    +    protected PartitionSender sender;
    +    protected EventHubBoltConfig boltConfig;
    +
    +    /**
    +     * Constructs an instance that uses the specified connection string to connect
    +     * to an EventHub and write to the specified entityPath
    +     *
    +     * @param connectionString EventHub connection String
    +     * @param entityPath       entity path to write to
    +     */
    +    public EventHubBolt(String connectionString, String entityPath) {
    +        boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    +    }
    +
    +    /**
    +     * Constructs an instance that connects to an EventHub using the specified
    +     * connection credentials.
    +     *
    +     * @param userName      UserName to connect as
    +     * @param password      Password to use
    +     * @param namespace     target namespace for the service bus
    +     * @param entityPath    Name of the event hub
    +     * @param partitionMode number of partitions
    --- End diff --
    
    This description might need to be updated.


---

[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

Posted by SreeramGarlapati <gi...@git.apache.org>.
Github user SreeramGarlapati commented on the issue:

    https://github.com/apache/storm/pull/2588
  
    thanks @srdo - i will take a shot at these changes by EOD monday PST.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635004
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/PartitionManager.java ---
    @@ -0,0 +1,132 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.esotericsoftware.minlog.Log;
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +public class PartitionManager extends SimplePartitionManager {
    +    private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class);
    +
    +    // all sent events are stored in pending
    --- End diff --
    
    Nit: If you call them "pending emitted events" instead of "sent events" it is more clear what you mean


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631895
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    +    /**
    +     * Constructs an instance with specified connection string, and eventhub name
    +     * The @link {@link #partitionMode} is set to false.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     */
    +    public EventHubBoltConfig(final String connectionString, final String entityPath) {
    +        this(connectionString, entityPath, false, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, eventhub name and
    +     * partition mode.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     * @param partitionMode    partitionMode to apply
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode) {
    +        this(connectionString, entityPath, partitionMode, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified credentials, eventhub name and
    +     * partition mode.
    +     * <p>
    +     * <p>
    +     * For soverign clouds please use the constructor
    +     * {@link EventHubBolt#EventHubBolt(String, String)}.
    +     * </p>
    +     *
    +     * @param userName      user name to connect as
    +     * @param password      password for the user name
    +     * @param namespace     servicebus namespace
    +     * @param entityPath    EntityHub name
    +     * @param partitionMode Dictates write mode. if true will write to specific partitions
    --- End diff --
    
    Nit: Should probably use the same description as the constructor above


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173634390
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/IEventHubReceiver.java ---
    @@ -0,0 +1,78 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubException;
    +
    +/**
    + * EventHub based Receiver contracts
    + *
    + */
    +public interface IEventHubReceiver {
    +
    +	/**
    +	 * Open / Establish connection to Eventhub given filters. The partition to
    --- End diff --
    
    Nit: "with the given filters" would be more clear 


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173632083
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---
    @@ -17,95 +17,185 @@
      *******************************************************************************/
     package org.apache.storm.eventhubs.bolt;
     
    -import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
    -
     import java.io.Serializable;
    +import java.net.URI;
    +import java.net.URISyntaxException;
     
    -import java.io.Serializable;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.format.DefaultEventDataFormat;
    +import org.apache.storm.eventhubs.format.IEventDataFormat;
    +
    +import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
     
    -/*
    +/**
      * EventHubs bolt configurations
    + * <p>
    + * Partition mode: partitionMode=true, in this mode each bolt task will write to
    + * a partition with the same id as that of the task index. For this mode, the
    + * number of bolt tasks must match the number of partitions.
    + * <p>
    + * partitionMode=false, default setting. There is no affinity between bolt tasks
    + * and partitions. Events are written to partitions as determined by the
    + * EventHub partitioning logic.
      *
    - * Partition mode:
    - * With partitionMode=true you need to create the same number of tasks as the number of 
    - * EventHubs partitions, and each bolt task will only send data to one partition.
    - * The partition ID is the task ID of the bolt.
    - * 
    - * Event format:
    - * The formatter to convert tuple to bytes for EventHubs.
    - * if null, the default format is common delimited tuple fields.
    + * @see IEventDataFormat
      */
     public class EventHubBoltConfig implements Serializable {
    -  private static final long serialVersionUID = 1L;
    -  
    -  private String connectionString;
    -  private final String entityPath;
    -  protected boolean partitionMode;
    -  protected IEventDataFormat dataFormat;
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath) {
    -    this(connectionString, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode) {
    -    this(connectionString, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace,
    -        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    -  }
    -  
    -  public EventHubBoltConfig(String connectionString, String entityPath,
    -      boolean partitionMode, IEventDataFormat dataFormat) {
    -    this.connectionString = connectionString;
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +    private static final long serialVersionUID = 1L;
    +
    +    private String connectionString;
    +    protected boolean partitionMode;
    +    protected IEventDataFormat dataFormat;
    +
    +    /**
    +     * Constructs an instance with specified connection string, and eventhub name
    +     * The @link {@link #partitionMode} is set to false.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     */
    +    public EventHubBoltConfig(final String connectionString, final String entityPath) {
    +        this(connectionString, entityPath, false, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, eventhub name and
    +     * partition mode.
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param entityPath       EventHub name
    +     * @param partitionMode    partitionMode to apply
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode) {
    +        this(connectionString, entityPath, partitionMode, null);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified credentials, eventhub name and
    +     * partition mode.
    +     * <p>
    +     * <p>
    +     * For soverign clouds please use the constructor
    +     * {@link EventHubBolt#EventHubBolt(String, String)}.
    +     * </p>
    +     *
    +     * @param userName      user name to connect as
    +     * @param password      password for the user name
    +     * @param namespace     servicebus namespace
    +     * @param entityPath    EntityHub name
    +     * @param partitionMode Dictates write mode. if true will write to specific partitions
    +     */
    +    public EventHubBoltConfig(String userName, String password, String namespace, String entityPath,
    +                              boolean partitionMode) {
    +        this(userName, password, namespace, FieldConstants.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance with specified connection string, and partition mode.
    +     * The specified {@link IEventDataFormat} will be used to format data to bytes
    +     * before
    +     *
    +     * @param connectionString EventHub connection string
    +     * @param partitionMode    Dictates write mode. if true will write to specific partitions
    +     * @param dataFormat       data formatter for serializing event data
    +     */
    +    public EventHubBoltConfig(String connectionString, String entityPath, boolean partitionMode, IEventDataFormat dataFormat) {
    +        this.connectionString = new ConnectionStringBuilder(connectionString)
    +                .setEventHubName(entityPath)
    +                .toString();
    +        this.partitionMode = partitionMode;
    +        this.dataFormat = dataFormat;
    +        if (this.dataFormat == null) {
    +            this.dataFormat = new DefaultEventDataFormat();
    +        }
         }
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String targetFqnAddress, String entityPath) {
    -    this(userName, password, namespace, targetFqnAddress, entityPath, false, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String targetFqnAddress, String entityPath, boolean partitionMode) {
    -    this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null);
    -  }
    -  
    -  public EventHubBoltConfig(String userName, String password, String namespace,
    -      String targetFqnAddress, String entityPath, boolean partitionMode,
    -      IEventDataFormat dataFormat) {
    -    this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
    -            userName,password).toString();
    -    this.entityPath = entityPath;
    -    this.partitionMode = partitionMode;
    -    this.dataFormat = dataFormat;
    -    if(this.dataFormat == null) {
    -      this.dataFormat = new DefaultEventDataFormat();
    +
    +    /**
    +     * Constructs an instance with specified credentials, and connection information
    +     *
    +     * @param userName   user name to connect as
    +     * @param password   password for the user name
    +     * @param namespace  servicebus namespace
    +     * @param fqdnSuffix FQDN suffix for the servicebus entity url. servicebus.windows.net)
    --- End diff --
    
    Did something get cut out at the end here?


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635542
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/PartitionManager.java ---
    @@ -0,0 +1,132 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.esotericsoftware.minlog.Log;
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +public class PartitionManager extends SimplePartitionManager {
    +    private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class);
    +
    +    // all sent events are stored in pending
    +    private final Map<String, EventHubMessage> pending;
    +
    +    // all failed events are put in toResend, which is sorted by event's offset
    +    private final TreeSet<EventHubMessage> toResend;
    +
    +    private final TreeSet<EventHubMessage> waitingToEmit;
    +
    +    public PartitionManager(EventHubConfig ehConfig, String partitionId, IStateStore stateStore,
    +                            IEventHubReceiver receiver) {
    +        super(ehConfig, partitionId, stateStore, receiver);
    +
    +        this.pending = new LinkedHashMap<String, EventHubMessage>();
    +        this.toResend = new TreeSet<EventHubMessage>();
    +        this.waitingToEmit = new TreeSet<EventHubMessage>();
    +    }
    +
    +    private void fill() {
    +        Iterable<EventData> receivedEvents = receiver.receive(config.getReceiveEventsMaxCount());
    +        if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
    +            logger.debug("No messages received from EventHub.");
    +            return;
    +        }
    +
    +        String startOffset = null;
    +        String endOffset = null;
    +        for (EventData ed : receivedEvents) {
    +            EventHubMessage ehm = new EventHubMessage(ed, partitionId);
    +            startOffset = (startOffset == null) ? ehm.getOffset() : startOffset;
    +            endOffset = ehm.getOffset();
    +            waitingToEmit.add(ehm);
    +        }
    +
    +        logger.debug("Received Messages Start Offset: " + startOffset + ", End Offset: " + endOffset);
    +    }
    +
    +    @Override
    +    public EventHubMessage receive() {
    +        logger.debug("Retrieving messages for partition: " + partitionId);
    +        int countToRetrieve = pending.size() - config.getMaxPendingMsgsPerPartition();
    +
    +        if (countToRetrieve >= 0) {
    +            Log.debug("Pending queue has more than " + config.getMaxPendingMsgsPerPartition()
    +                    + " messages. No new events will be retrieved from EventHub.");
    +            return null;
    +        }
    +
    +        EventHubMessage ehm = null;
    +        if (!toResend.isEmpty()) {
    +            ehm = toResend.pollFirst();
    +        } else {
    +            if (waitingToEmit.isEmpty()) {
    +                fill();
    +            }
    +            ehm = waitingToEmit.pollFirst();
    +        }
    +
    +        if (ehm == null) {
    +            logger.debug("No messages pending or waiting for reprocessing.");
    +            return null;
    +        }
    +
    +        lastOffset = ehm.getOffset();
    +        pending.put(lastOffset, ehm);
    +        return ehm;
    +    }
    +
    +    @Override
    +    public void ack(String offset) {
    +        pending.remove(offset);
    +    }
    +
    +    @Override
    +    public void fail(String offset) {
    +        logger.warn("fail on " + offset);
    +        toResend.add(pending.remove(offset));
    +    }
    +
    +    @Override
    +    protected String getCompletedOffset() {
    --- End diff --
    
    Why does this method make sense? It's not returning the last completed offset, but the smallest pending offset? As far as I can tell, this will prevent the manager from offering at-least-once processing, because you might skip tuples if one of the pending offsets is checkpointed, and the spout then crashes.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635092
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/PartitionReceiverFactory.java ---
    @@ -0,0 +1,57 @@
    +/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *******************************************************************************/
    +
    +package org.apache.storm.eventhubs.core;
    +
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.EventPosition;
    +import com.microsoft.azure.eventhubs.PartitionReceiver;
    +import com.microsoft.azure.eventhubs.EventHubException;
    +
    +public final class PartitionReceiverFactory {
    +
    +    public static PartitionReceiver createReceiver(EventHubClient ehClient, IEventFilter filter,
    +                                                   EventHubConfig eventHubConfig, String partitionId) throws EventHubException {
    +
    +        if (filter instanceof OffsetFilter) {
    +            return createOffsetReceiver(ehClient, (OffsetFilter) filter, eventHubConfig, partitionId);
    +        } else {
    +            return createTimestampReceiver(ehClient, (TimestampFilter) filter, eventHubConfig, partitionId);
    +        }
    +    }
    +
    +    private static PartitionReceiver createOffsetReceiver(EventHubClient ehClient, OffsetFilter filter,
    +                                                          EventHubConfig eventHubConfig, String partitionId) throws EventHubException {
    +
    +        return ehClient.createEpochReceiverSync(
    +                eventHubConfig.getConsumerGroupName(),
    +                partitionId,
    +                EventPosition.fromOffset(filter.getOffset(), false),
    --- End diff --
    
    I think you would be better off putting this line in the OffsetFilter implementation, and similar for TimestampFilter. That way you can have IEventFilter declare a getOffset() method instead of having to do the instanceof check in createReceiver.


---

[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635798
  
    --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/StringEventDataScheme.java ---
    @@ -15,60 +15,43 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      *******************************************************************************/
    -package org.apache.storm.eventhubs.spout;
    +package org.apache.storm.eventhubs.format;
     
    -import com.microsoft.azure.eventhubs.EventData;
    -import org.apache.storm.tuple.Fields;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    -import java.io.IOException;
     import java.util.ArrayList;
     import java.util.List;
    -import java.util.Map;
     
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.tuple.Fields;
     
     /**
    - * An Event Data Scheme which deserializes message payload into the raw bytes.
    - *
    + * An Event Data Scheme which deserializes message payload into the Strings. No
    + * encoding is assumed. The receiver will need to handle parsing of the string
    + * data in appropriate encoding.
    + * <p>
      * The resulting tuple would contain three items, the first being the message
      * bytes, and the second a map of properties that include metadata, which can be
      * used to determine who processes the message, and how it is processed.The third is
      * the system properties which exposes information like enqueue-time, offset and
    - * sequence number
    + * sequence number.
      */
    -public class BinaryEventDataScheme implements IEventDataScheme {
    +public class StringEventDataScheme implements IEventDataScheme {
    +
    +    private static final long serialVersionUID = 1L;
    +
    +    @Override
    +    public List<Object> deserialize(EventHubMessage eventHubMessage) {
    +        final List<Object> fieldContents = new ArrayList<Object>();
    +        final String messageData = new String(eventHubMessage.getContent());
    --- End diff --
    
    This should probably use a specific charset


---

[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2588
  
    And also, yes you can change the line ending to LF and that should work too


---