You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/07/02 16:30:32 UTC

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6200#discussion_r199548194
  
    --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.util.IOUtils;
    +
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
    + * When checkpointing is enabled, it guarantees at least once processing semantics.
    + *
    + * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has
    + * received. In this mode messages may be dropped.
    + */
    +class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
    +
    +	private final int messageReceiveTimeoutMs = 100;
    +	private final String serviceUrl;
    +	private final String topic;
    +	private final String subscriptionName;
    +	private final DeserializationSchema<T> deserializer;
    +
    +	private PulsarClient client;
    +	private Consumer<byte[]> consumer;
    +
    +	private boolean isCheckpointingEnabled;
    +
    +	private final long acknowledgementBatchSize;
    +	private long batchCount;
    +	private long totalMessageCount;
    +
    +	private transient volatile boolean isRunning;
    +
    +	PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
    +		super(MessageId.class);
    +		this.serviceUrl = builder.serviceUrl;
    +		this.topic = builder.topic;
    +		this.deserializer = builder.deserializationSchema;
    +		this.subscriptionName = builder.subscriptionName;
    +		this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		super.open(parameters);
    +
    +		final RuntimeContext context = getRuntimeContext();
    +		if (context instanceof StreamingRuntimeContext) {
    +			isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
    +		}
    +
    +		client = createClient();
    +		consumer = createConsumer(client);
    +
    +		isRunning = true;
    +	}
    +
    +	@Override
    +	protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) {
    +		if (consumer == null) {
    +			LOG.error("null consumer unable to acknowledge messages");
    +			throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
    +		}
    +
    +		if (messageIds.isEmpty()) {
    +			LOG.info("no message ids to acknowledge");
    +			return;
    +		}
    +
    +		Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size());
    +		for (MessageId id : messageIds) {
    +			futures.put(id.toString(), consumer.acknowledgeAsync(id));
    +		}
    +
    +		futures.forEach((k, f) -> {
    +			try {
    +				f.get();
    +			} catch (Exception e) {
    +				LOG.error("failed to acknowledge messageId " + k, e);
    +				throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
    --- End diff --
    
    Can we handle the Exception more graciously, the throw will break the following futures, is that your expectation?


---