You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by cckellogg <gi...@git.apache.org> on 2018/06/21 22:26:36 UTC

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

GitHub user cckellogg opened a pull request:

    https://github.com/apache/flink/pull/6200

    [FLINK-9641] [streaming-connectors] Flink pulsar source connector

    ## What is the purpose of the change
    
    This pull request adds a [pulsar](https://github.com/apache/incubator-pulsar) source connector which will enable flink jobs to process messages from pulsar topics.
    
    ## Brief change log
     - Add a PulsarConsumerSource connector
    
    ## Verifying this change
    This change adds unit test to verify checkpointing and batch message acknowledgements.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cckellogg/flink flink-pulsar-source-connector

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6200
    
----
commit b69fb21dc82e7922f7b7e65c94c154d56e442e5e
Author: Chris <ch...@...>
Date:   2018-06-20T21:53:06Z

    Add a simple pulsar source connector.

commit fb170c435abb2b2e09913a0430d2f73dc1edbbe1
Author: Chris <ch...@...>
Date:   2018-06-21T00:03:12Z

    Remove metrics class and add max ack batch size.

----


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by cckellogg <gi...@git.apache.org>.
Github user cckellogg commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199909617
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.util.IOUtils;
    +
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
    + * When checkpointing is enabled, it guarantees at least once processing semantics.
    + *
    + * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has
    + * received. In this mode messages may be dropped.
    + */
    +class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
    +
    +	private final int messageReceiveTimeoutMs = 100;
    +	private final String serviceUrl;
    +	private final String topic;
    +	private final String subscriptionName;
    +	private final DeserializationSchema<T> deserializer;
    +
    +	private PulsarClient client;
    +	private Consumer<byte[]> consumer;
    +
    +	private boolean isCheckpointingEnabled;
    +
    +	private final long acknowledgementBatchSize;
    +	private long batchCount;
    +	private long totalMessageCount;
    +
    +	private transient volatile boolean isRunning;
    +
    +	PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
    +		super(MessageId.class);
    +		this.serviceUrl = builder.serviceUrl;
    +		this.topic = builder.topic;
    +		this.deserializer = builder.deserializationSchema;
    +		this.subscriptionName = builder.subscriptionName;
    +		this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		super.open(parameters);
    +
    +		final RuntimeContext context = getRuntimeContext();
    +		if (context instanceof StreamingRuntimeContext) {
    +			isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
    +		}
    +
    +		client = createClient();
    +		consumer = createConsumer(client);
    +
    +		isRunning = true;
    +	}
    +
    +	@Override
    +	protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) {
    +		if (consumer == null) {
    +			LOG.error("null consumer unable to acknowledge messages");
    +			throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
    +		}
    +
    +		if (messageIds.isEmpty()) {
    +			LOG.info("no message ids to acknowledge");
    +			return;
    +		}
    +
    +		Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size());
    +		for (MessageId id : messageIds) {
    +			futures.put(id.toString(), consumer.acknowledgeAsync(id));
    +		}
    +
    +		futures.forEach((k, f) -> {
    +			try {
    +				f.get();
    +			} catch (Exception e) {
    +				LOG.error("failed to acknowledge messageId " + k, e);
    +				throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
    --- End diff --
    
    Can I modify the Set that is passed in? If not I can remove the throwing of the RuntimeException. Thoughts?


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199548194
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.util.IOUtils;
    +
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
    + * When checkpointing is enabled, it guarantees at least once processing semantics.
    + *
    + * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has
    + * received. In this mode messages may be dropped.
    + */
    +class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
    +
    +	private final int messageReceiveTimeoutMs = 100;
    +	private final String serviceUrl;
    +	private final String topic;
    +	private final String subscriptionName;
    +	private final DeserializationSchema<T> deserializer;
    +
    +	private PulsarClient client;
    +	private Consumer<byte[]> consumer;
    +
    +	private boolean isCheckpointingEnabled;
    +
    +	private final long acknowledgementBatchSize;
    +	private long batchCount;
    +	private long totalMessageCount;
    +
    +	private transient volatile boolean isRunning;
    +
    +	PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
    +		super(MessageId.class);
    +		this.serviceUrl = builder.serviceUrl;
    +		this.topic = builder.topic;
    +		this.deserializer = builder.deserializationSchema;
    +		this.subscriptionName = builder.subscriptionName;
    +		this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		super.open(parameters);
    +
    +		final RuntimeContext context = getRuntimeContext();
    +		if (context instanceof StreamingRuntimeContext) {
    +			isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
    +		}
    +
    +		client = createClient();
    +		consumer = createConsumer(client);
    +
    +		isRunning = true;
    +	}
    +
    +	@Override
    +	protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) {
    +		if (consumer == null) {
    +			LOG.error("null consumer unable to acknowledge messages");
    +			throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
    +		}
    +
    +		if (messageIds.isEmpty()) {
    +			LOG.info("no message ids to acknowledge");
    +			return;
    +		}
    +
    +		Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size());
    +		for (MessageId id : messageIds) {
    +			futures.put(id.toString(), consumer.acknowledgeAsync(id));
    +		}
    +
    +		futures.forEach((k, f) -> {
    +			try {
    +				f.get();
    +			} catch (Exception e) {
    +				LOG.error("failed to acknowledge messageId " + k, e);
    +				throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
    --- End diff --
    
    Can we handle the Exception more graciously, the throw will break the following futures, is that your expectation?


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by cckellogg <gi...@git.apache.org>.
Github user cckellogg commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199909645
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java ---
    @@ -0,0 +1,30 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +/**
    + * Default values for Pulsar connectors.
    + */
    +public class Defaults {
    --- End diff --
    
    will remove.


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199546643
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java ---
    @@ -0,0 +1,30 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +/**
    + * Default values for Pulsar connectors.
    + */
    +public class Defaults {
    --- End diff --
    
    since this class just is used by `PulsarSourceBuilder`, I think we can move the constants into `PulsarSourceBuilder ` and remove this class to reduce the cost of maintenance. 


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by cckellogg <gi...@git.apache.org>.
Github user cckellogg commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r203863637
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.util.IOUtils;
    +
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
    + * When checkpointing is enabled, it guarantees at least once processing semantics.
    + *
    + * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has
    + * received. In this mode messages may be dropped.
    + */
    +class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
    +
    +	private final int messageReceiveTimeoutMs = 100;
    +	private final String serviceUrl;
    +	private final String topic;
    +	private final String subscriptionName;
    +	private final DeserializationSchema<T> deserializer;
    +
    +	private PulsarClient client;
    +	private Consumer<byte[]> consumer;
    +
    +	private boolean isCheckpointingEnabled;
    +
    +	private final long acknowledgementBatchSize;
    +	private long batchCount;
    +	private long totalMessageCount;
    +
    +	private transient volatile boolean isRunning;
    +
    +	PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
    +		super(MessageId.class);
    +		this.serviceUrl = builder.serviceUrl;
    +		this.topic = builder.topic;
    +		this.deserializer = builder.deserializationSchema;
    +		this.subscriptionName = builder.subscriptionName;
    +		this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		super.open(parameters);
    +
    +		final RuntimeContext context = getRuntimeContext();
    +		if (context instanceof StreamingRuntimeContext) {
    +			isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
    +		}
    +
    +		client = createClient();
    +		consumer = createConsumer(client);
    +
    +		isRunning = true;
    +	}
    +
    +	@Override
    +	protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) {
    +		if (consumer == null) {
    +			LOG.error("null consumer unable to acknowledge messages");
    +			throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
    +		}
    +
    +		if (messageIds.isEmpty()) {
    +			LOG.info("no message ids to acknowledge");
    +			return;
    +		}
    +
    +		Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size());
    +		for (MessageId id : messageIds) {
    +			futures.put(id.toString(), consumer.acknowledgeAsync(id));
    +		}
    +
    +		futures.forEach((k, f) -> {
    +			try {
    +				f.get();
    +			} catch (Exception e) {
    +				LOG.error("failed to acknowledge messageId " + k, e);
    +				throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
    --- End diff --
    
    @yanghua can the set passed in be modified?


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by cckellogg <gi...@git.apache.org>.
Github user cckellogg commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199909775
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java ---
    @@ -0,0 +1,30 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    +
    +/**
    + * Base class for pulsar sources.
    + * @param <T>
    --- End diff --
    
    will add a comment


---

[GitHub] flink issue #6200: [FLINK-9641] [streaming-connectors] Flink pulsar source c...

Posted by sijie <gi...@git.apache.org>.
Github user sijie commented on the issue:

    https://github.com/apache/flink/pull/6200
  
    wondering anyone from flink community can help review this PR?


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199551693
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java ---
    @@ -0,0 +1,513 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.api.common.state.OperatorStateStore;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.StreamSource;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
    +
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerStats;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.Schema;
    +import org.apache.pulsar.client.impl.MessageImpl;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +
    +import java.util.ArrayDeque;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.mockito.Matchers.any;
    +
    +/**
    + * Tests for the PulsarConsumerSource. The source supports two operation modes.
    + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in
    + *    {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}..
    + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after
    + *	  after it receives x number of messages.
    + *
    + * <p>This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the
    + * case. The MessageId is used to uniquely identify messages.
    + */
    +public class PulsarConsumerSourceTests {
    +
    +	private PulsarConsumerSource<String> source;
    +
    +	private TestConsumer consumer;
    +
    +	private TestSourceContext context;
    +
    +	private Thread sourceThread;
    +
    +	private Exception exception;
    --- End diff --
    
    it is not necessary so many blank lines.


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199549195
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java ---
    @@ -0,0 +1,30 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    +
    +/**
    + * Base class for pulsar sources.
    + * @param <T>
    --- End diff --
    
    describe the generic type.


---

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Posted by cckellogg <gi...@git.apache.org>.
Github user cckellogg commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199909793
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java ---
    @@ -0,0 +1,513 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.api.common.state.OperatorStateStore;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.StreamSource;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
    +
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerStats;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.Schema;
    +import org.apache.pulsar.client.impl.MessageImpl;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +
    +import java.util.ArrayDeque;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.mockito.Matchers.any;
    +
    +/**
    + * Tests for the PulsarConsumerSource. The source supports two operation modes.
    + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in
    + *    {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}..
    + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after
    + *	  after it receives x number of messages.
    + *
    + * <p>This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the
    + * case. The MessageId is used to uniquely identify messages.
    + */
    +public class PulsarConsumerSourceTests {
    +
    +	private PulsarConsumerSource<String> source;
    +
    +	private TestConsumer consumer;
    +
    +	private TestSourceContext context;
    +
    +	private Thread sourceThread;
    +
    +	private Exception exception;
    --- End diff --
    
    will fix


---