You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2016/03/07 17:53:50 UTC

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/1771

    [FLINK-3311/FLINK-3332] Add Cassandra connector

    This PR is a combination of #1640 and #1660, essentially providing a mid-merge view of both PR's. As they reside in the same module and expose similar functionalities, several things were changed in both commits to make the code more similar.
    
    Such changes include formatting, field ordering, code in-lining, package declarations, file names, test unification, and the addition of a central method to create sinks (CassandraSink.addSink(DataStream<T> input)) to reduce complexity for the user resulting from different sink implementations.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink cass_final

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1771
    
----
commit 1f296cba0f85696e2ff4ae48bbbdd8a7eaf6c8e7
Author: Andrea Sella <an...@radicalbit.io>
Date:   2016-03-02T15:28:11Z

    [FLINK-3311] Add At-Least-Once Cassandra connector

commit 7597dcaf976818b23a91872650f355e0505cdd59
Author: zentol <s....@web.de>
Date:   2016-03-03T10:58:59Z

    [FLINK-3332] Add Exactly-Once Cassandra connector

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r56825317
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,197 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    --- End diff --
    
    I've struggled a bit with the name for this class, but came to the conclusion that since class itself does not provide exactly-once guarantees for all use-cases. As such the GenericAtLeastOnceSink name is intended.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-207421723
  
    I've tested the change again on a cluster and locally. The mode without WAL works fine, the WAL-variant fails on (or quickly after) recovery.
    It seems to be an issue with the `FsStatebackend`, not with this PR. Still I'll wait with the merging until we understood the issue.
    
    ![image](https://cloud.githubusercontent.com/assets/89049/14383013/6ed277de-fd92-11e5-8387-4e09575b9cce.png)
    
    ![image](https://cloud.githubusercontent.com/assets/89049/14383018/776e0368-fd92-11e5-9bfe-cce5fd9d0277.png)
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r56156990
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.batch.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.Row;
    +import com.datastax.driver.core.Session;
    +import com.google.common.base.Strings;
    +import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
    +import org.apache.flink.api.common.io.NonParallelInput;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
    + *
    + * @param <OUT> type of Tuple
    + */
    +public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class);
    +
    +	private final String query;
    +	private final ClusterBuilder builder;
    +
    +	private transient Cluster cluster;
    +	private transient Session session;
    +	private transient ResultSet resultSet;
    +
    +	public CassandraInputFormat(String query, ClusterBuilder builder) {
    +		if (Strings.isNullOrEmpty(query)) {
    +			throw new IllegalArgumentException("Query cannot be null or empty");
    +		}
    +		if (builder == null) {
    +			throw new IllegalArgumentException("Builder cannot be null.");
    +		}
    +		this.query = query;
    +		this.builder = builder;
    +	}
    +
    +	@Override
    +	public void configure(Configuration parameters) {
    +		this.cluster = builder.getCluster();
    +	}
    +
    +	@Override
    +	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +		return cachedStatistics;
    +	}
    +
    +	/**
    +	 * Opens a Session and executes the query.
    +	 *
    +	 * @param ignored
    +	 * @throws IOException
    +	 */
    +	@Override
    +	public void open(InputSplit ignored) throws IOException {
    +		this.session = cluster.connect();
    +		this.resultSet = session.execute(query);
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return resultSet.isExhausted();
    +	}
    +
    +	@Override
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +		final Row item = resultSet.one();
    +		for (int i = 0; i < reuse.getArity(); i++) {
    +			reuse.setField(item.getObject(i), i);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +		GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
    +		return split;
    +	}
    +
    +	@Override
    +	public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +		return new DefaultInputSplitAssigner(inputSplits);
    +	}
    +
    +	/**
    +	 * Closes all resources used.
    +	 */
    +	@Override
    +	public void close() throws IOException {
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
    +		}
    +
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
    --- End diff --
    
    I would add the cause in both cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58561933
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -0,0 +1,136 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.BoundStatement;
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.google.common.util.concurrent.FutureCallback;
    +import com.google.common.util.concurrent.Futures;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a 
    + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra
    + * if a checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericAtLeastOnceSink<IN> {
    +	protected transient Cluster cluster;
    +	protected transient Session session;
    +
    +	private final String insertQuery;
    +	private transient PreparedStatement preparedStatement;
    +
    +	private transient Throwable exception = null;
    +	private transient FutureCallback<ResultSet> callback;
    +
    +	private ClusterBuilder builder;
    +
    +	private int updatesSent = 0;
    +	private AtomicInteger updatesConfirmed = new AtomicInteger(0);
    +
    +	private transient Object[] fields;
    +
    +	protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception {
    +		super(committer, serializer, jobID);
    +		this.insertQuery = insertQuery;
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public void open() throws Exception {
    +		super.open();
    +		if (!getRuntimeContext().isCheckpointingEnabled()) {
    +			throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
    +		}
    +		this.callback = new FutureCallback<ResultSet>() {
    +			@Override
    +			public void onSuccess(ResultSet resultSet) {
    +				updatesConfirmed.incrementAndGet();
    +			}
    +
    +			@Override
    +			public void onFailure(Throwable throwable) {
    +				exception = throwable;
    +			}
    +		};
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +		preparedStatement = session.prepare(insertQuery);
    +
    +		fields = new Object[((TupleSerializer<IN>) serializer).getArity()];
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		super.close();
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
    +		//verify that no query failed until now
    +		if (exception != null) {
    +			throw new Exception(exception);
    +		}
    +		//set values for prepared statement
    +		for (IN value : values) {
    +			for (int x = 0; x < value.getArity(); x++) {
    +				fields[x] = value.getField(x);
    +			}
    +			//insert values and send to cassandra
    +			BoundStatement s = preparedStatement.bind(fields);
    +			s.setDefaultTimestamp(timestamp);
    +			ResultSetFuture result = session.executeAsync(s);
    +			updatesSent++;
    +			if (result != null) {
    +				//add callback to detect errors
    +				Futures.addCallback(result, callback);
    +			}
    +		}
    +		while (updatesSent != updatesConfirmed.get()) {
    +			try {
    +				Thread.sleep(100);
    +			} catch (InterruptedException e) {
    --- End diff --
    
    I would leave the while loop on an interrupt.
    It might happen that the thread gets locked in the loop because it can not be interrupted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63678427
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    --- End diff --
    
    whether it triggers a lookup is up to the Committer implementation. You could always keep a local version, and only lookup the value when the local version is null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63856337
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    --- End diff --
    
    session.execute() throws an exception if the resource cannot be created. This exception is propagated, which is the important part.
    
    exceptions while closing the session isn't particularly important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63678203
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		if (builder == null) {
    +			throw new RuntimeException("No ClusterBuilder was set.");
    +		}
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +
    +		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", keySpace, table, operatorId, subtaskId));
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		session.executeAsync(deleteStatement.bind());
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void commitCheckpoint(long checkpointID) {
    +		session.execute(updateStatement.bind(checkpointID));
    --- End diff --
    
    an exception is thrown in case of failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58561096
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Session;
    +import com.google.common.util.concurrent.FutureCallback;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.ListenableFuture;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
    +	protected transient Cluster cluster;
    +	protected transient Session session;
    +
    +	protected transient Throwable exception = null;
    +	protected transient FutureCallback<V> callback;
    +
    +	private final ClusterBuilder builder;
    +
    +	protected CassandraSinkBase(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	@Override
    +	public void open(Configuration configuration) {
    +		this.callback = new FutureCallback<V>() {
    +			@Override
    +			public void onSuccess(V ignored) {
    +			}
    +
    +			@Override
    +			public void onFailure(Throwable t) {
    +				exception = t;
    --- End diff --
    
    Can you log the exception as well. Maybe invoke() is never called after the failure and nobody knows what's going on


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58548774
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,126 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private static final String KEYSPACE = "flink_auxiliary";
    +	private String TABLE = "checkpoints_";
    --- End diff --
    
    by convention, (non static) fields are lowercase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-209400438
  
    @rmetzger I hav efixed the issue you encountered. The problem was that the JM tries to discard all state belonging to pending checkpoints upon job failure, which effectively purged the write-ahead log. The simple fix is to not do anything in ExactlyOnceState#discardState.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58999599
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,126 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private static final String KEYSPACE = "flink_auxiliary";
    +	private String TABLE = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		TABLE += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
    --- End diff --
    
    Users still can not pass custom keyspaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58548996
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,126 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private static final String KEYSPACE = "flink_auxiliary";
    +	private String TABLE = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		TABLE += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
    --- End diff --
    
    I think it would be better to allow users passing a custom keyspace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58557862
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,313 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +import java.util.UUID;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
    +	private final boolean useDataStreamSink;
    +	private DataStreamSink<IN> sink1;
    +	private SingleOutputStreamOperator<IN> sink2;
    +
    +	private CassandraSink(DataStreamSink<IN> sink) {
    +		sink1 = sink;
    +		useDataStreamSink = true;
    +	}
    +
    +	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
    +		sink2 = sink;
    +		useDataStreamSink = false;
    +	}
    +
    +	private SinkTransformation<IN> getSinkTransformation() {
    +		return sink1.getTransformation();
    +	}
    +
    +	private StreamTransformation<IN> getStreamTransformation() {
    +		return sink2.getTransformation();
    +	}
    +
    +	/**
    +	 * Sets the name of this sink. This name is
    +	 * used by the visualization and logging during runtime.
    +	 *
    +	 * @return The named sink.
    +	 */
    +	public CassandraSink<IN> name(String name) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setName(name);
    +		} else {
    +			getStreamTransformation().setName(name);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets an ID for this operator.
    +	 * <p/>
    +	 * <p>The specified ID is used to assign the same operator ID across job
    +	 * submissions (for example when starting a job from a savepoint).
    +	 * <p/>
    +	 * <p><strong>Important</strong>: this ID needs to be unique per
    +	 * transformation and job. Otherwise, job submission will fail.
    +	 *
    +	 * @param uid The unique user-specified ID of this transformation.
    +	 * @return The operator with the specified ID.
    +	 */
    +	public CassandraSink<IN> uid(String uid) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setUid(uid);
    +		} else {
    +			getStreamTransformation().setUid(uid);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the parallelism for this sink. The degree must be higher than zero.
    +	 *
    +	 * @param parallelism The parallelism for this sink.
    +	 * @return The sink with set parallelism.
    +	 */
    +	public CassandraSink<IN> setParallelism(int parallelism) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setParallelism(parallelism);
    +		} else {
    +			getStreamTransformation().setParallelism(parallelism);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Turns off chaining for this operator so thread co-location will not be
    +	 * used as an optimization.
    +	 * <p/>
    +	 * <p/>
    +	 * Chaining can be turned off for the whole
    +	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
    +	 * however it is not advised for performance considerations.
    +	 *
    +	 * @return The sink with chaining disabled
    +	 */
    +	public CassandraSink<IN> disableChaining() {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		} else {
    +			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the slot sharing group of this operation. Parallel instances of
    +	 * operations that are in the same slot sharing group will be co-located in the same
    +	 * TaskManager slot, if possible.
    +	 * <p/>
    +	 * <p>Operations inherit the slot sharing group of input operations if all input operations
    +	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
    +	 * <p/>
    +	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
    +	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
    +	 *
    +	 * @param slotSharingGroup The slot sharing group name.
    +	 */
    +	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
    +		} else {
    +			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Writes a DataStream into a Cassandra database.
    +	 *
    +	 * @param input input DataStream
    +	 * @param <IN>  input type
    +	 * @return CassandraSinkBuilder, to further configure the sink
    +	 */
    +	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
    +		if (input.getType() instanceof TupleTypeInfo) {
    +			DataStream<T> tupleInput = (DataStream<T>) input;
    +			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
    +		} else {
    +			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
    +		}
    +	}
    +
    +	public abstract static class CassandraSinkBuilder<IN> {
    +		protected final DataStream<IN> input;
    +		protected final TypeSerializer<IN> serializer;
    +		protected final TypeInformation<IN> typeInfo;
    +		protected ClusterBuilder builder;
    +		protected String query;
    +		protected CheckpointCommitter committer;
    +		protected boolean isWriteAheadLogEnabled;
    +
    +		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
    +			this.input = input;
    +			this.typeInfo = typeInfo;
    +			this.serializer = serializer;
    +		}
    +
    +		/**
    +		 * Sets the query that is to be executed for every record.
    +		 *
    +		 * @param query query to use
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setQuery(String query) {
    +			this.query = query;
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the cassandra host to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(String host) {
    +			return setHost(host, 9042);
    +		}
    +
    +		/**
    +		 * Sets the cassandra host/port to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @param port port to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
    +			builder = new ClusterBuilder() {
    +				@Override
    +				protected Cluster buildCluster(Cluster.Builder builder) {
    +					return builder.addContactPoint(host).withPort(port).build();
    +				}
    +			};
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
    +		 *
    +		 * @param builder ClusterBuilder to configure the connection to cassandra
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
    +			this.builder = builder;
    +			return this;
    +		}
    +
    +		/**
    +		 * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
    +		 * idempotent updates.
    +		 *
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> enableWriteAheadLog() {
    +			this.isWriteAheadLogEnabled = true;
    +			return this;
    +		}
    +
    +		/**
    +		 * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
    +		 * idempotent updates.
    +		 *
    +		 * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external
    +		 *                  resource. By default this information is stored within a separate table within Cassandra.      
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
    +			this.isWriteAheadLogEnabled = true;
    +			this.committer = committer;
    +			return this;
    +		}
    +
    +		/**
    +		 * Finalizes the configuration of this sink.
    +		 *
    +		 * @return finalized sink
    +		 * @throws Exception
    +		 */
    +		public abstract CassandraSink<IN> build() throws Exception;
    +		
    +		protected void sanityCheck() {
    --- End diff --
    
    very good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-209882761
  
    Thank you for the fix. I'll take a look at it soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63869055
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    --- End diff --
    
    This means that a failed closing operation cannot leave the external system in a corrupted state?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63855149
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		if (builder == null) {
    +			throw new RuntimeException("No ClusterBuilder was set.");
    +		}
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +
    +		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", keySpace, table, operatorId, subtaskId));
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		session.executeAsync(deleteStatement.bind());
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void commitCheckpoint(long checkpointID) {
    +		session.execute(updateStatement.bind(checkpointID));
    --- End diff --
    
    Ah ok, I see. `Session` throws` an unchecked exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r56156903
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/pom.xml ---
    @@ -0,0 +1,175 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-streaming-connectors</artifactId>
    +		<version>1.1-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-connector-cassandra_2.10</artifactId>
    +	<name>flink-connector-cassandra</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<!-- Allow users to pass custom connector versions -->
    +	<properties>
    +		<cassandra.version>2.2.5</cassandra.version>
    +		<driver.version>3.0.0</driver.version>
    +	</properties>
    +
    +	<build>
    +		<plugins>
    +			<plugin>
    +				<groupId>org.apache.maven.plugins</groupId>
    +				<artifactId>maven-surefire-plugin</artifactId>
    +				<configuration>
    +					<reuseForks>true</reuseForks>
    +					<forkCount>1</forkCount>
    +				</configuration>
    +			</plugin>
    +			<plugin>
    +				<groupId>org.apache.maven.plugins</groupId>
    +				<artifactId>maven-shade-plugin</artifactId>
    +				<version>2.4.1</version>
    +				<executions>
    +					<!-- Run shade goal on package phase -->
    +					<execution>
    +						<phase>package</phase>
    +						<goals>
    +							<goal>shade</goal>
    +						</goals>
    +						<configuration>
    +							<artifactSet>
    +								<includes>
    +									<include>com.datastax.cassandra:cassandra-driver-core</include>
    +									<include>com.datastax.cassandra:cassandra-driver-mapping</include>
    +								</includes>
    +							</artifactSet>
    +							<relocations>
    +								<relocation>
    +									<pattern>com.google</pattern>
    +									<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
    +									<excludes>
    +										<exclude>com.google.protobuf.**</exclude>
    +										<exclude>com.google.inject.**</exclude>
    +									</excludes>
    +								</relocation>
    +							</relocations>
    +						</configuration>
    +					</execution>
    +				</executions>
    +			</plugin>
    +		</plugins>
    +	</build>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-streaming-java_2.10</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>com.datastax.cassandra</groupId>
    +			<artifactId>cassandra-driver-core</artifactId>
    +			<version>${driver.version}</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>org.slf4j</groupId>
    +					<artifactId>log4j-over-slf4j</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>ch.qos.logback</groupId>
    +					<artifactId>logback-classic</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>com.datastax.cassandra</groupId>
    +			<artifactId>cassandra-driver-mapping</artifactId>
    +			<version>${driver.version}</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>org.slf4j</groupId>
    +					<artifactId>log4j-over-slf4j</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>ch.qos.logback</groupId>
    +					<artifactId>logback-classic</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>16.0.1</version>
    --- End diff --
    
    Please use the `guava.version` variable here to use the same guava version across Flink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r61421879
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,316 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private final boolean useDataStreamSink;
    +	private DataStreamSink<IN> sink1;
    +	private SingleOutputStreamOperator<IN> sink2;
    +
    +	private CassandraSink(DataStreamSink<IN> sink) {
    +		sink1 = sink;
    +		useDataStreamSink = true;
    +	}
    +
    +	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
    +		sink2 = sink;
    +		useDataStreamSink = false;
    +	}
    +
    +	private SinkTransformation<IN> getSinkTransformation() {
    +		return sink1.getTransformation();
    +	}
    +
    +	private StreamTransformation<IN> getStreamTransformation() {
    +		return sink2.getTransformation();
    +	}
    +
    +	/**
    +	 * Sets the name of this sink. This name is
    +	 * used by the visualization and logging during runtime.
    +	 *
    +	 * @return The named sink.
    +	 */
    +	public CassandraSink<IN> name(String name) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setName(name);
    +		} else {
    +			getStreamTransformation().setName(name);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets an ID for this operator.
    +	 * <p/>
    +	 * <p>The specified ID is used to assign the same operator ID across job
    +	 * submissions (for example when starting a job from a savepoint).
    +	 * <p/>
    +	 * <p><strong>Important</strong>: this ID needs to be unique per
    +	 * transformation and job. Otherwise, job submission will fail.
    +	 *
    +	 * @param uid The unique user-specified ID of this transformation.
    +	 * @return The operator with the specified ID.
    +	 */
    +	public CassandraSink<IN> uid(String uid) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setUid(uid);
    +		} else {
    +			getStreamTransformation().setUid(uid);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the parallelism for this sink. The degree must be higher than zero.
    +	 *
    +	 * @param parallelism The parallelism for this sink.
    +	 * @return The sink with set parallelism.
    +	 */
    +	public CassandraSink<IN> setParallelism(int parallelism) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setParallelism(parallelism);
    +		} else {
    +			getStreamTransformation().setParallelism(parallelism);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Turns off chaining for this operator so thread co-location will not be
    +	 * used as an optimization.
    +	 * <p/>
    +	 * <p/>
    +	 * Chaining can be turned off for the whole
    +	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
    +	 * however it is not advised for performance considerations.
    +	 *
    +	 * @return The sink with chaining disabled
    +	 */
    +	public CassandraSink<IN> disableChaining() {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		} else {
    +			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the slot sharing group of this operation. Parallel instances of
    +	 * operations that are in the same slot sharing group will be co-located in the same
    +	 * TaskManager slot, if possible.
    +	 * <p/>
    +	 * <p>Operations inherit the slot sharing group of input operations if all input operations
    +	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
    +	 * <p/>
    +	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
    +	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
    +	 *
    +	 * @param slotSharingGroup The slot sharing group name.
    +	 */
    +	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
    +		} else {
    +			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Writes a DataStream into a Cassandra database.
    +	 *
    +	 * @param input input DataStream
    +	 * @param <IN>  input type
    +	 * @return CassandraSinkBuilder, to further configure the sink
    +	 */
    +	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
    +		if (input.getType() instanceof TupleTypeInfo) {
    +			DataStream<T> tupleInput = (DataStream<T>) input;
    +			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
    +		} else {
    +			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
    +		}
    +	}
    +
    +	public abstract static class CassandraSinkBuilder<IN> {
    +		protected final DataStream<IN> input;
    +		protected final TypeSerializer<IN> serializer;
    +		protected final TypeInformation<IN> typeInfo;
    +		protected ClusterBuilder builder;
    +		protected String query;
    +		protected CheckpointCommitter committer;
    +		protected boolean isWriteAheadLogEnabled;
    +
    +		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
    +			this.input = input;
    +			this.typeInfo = typeInfo;
    +			this.serializer = serializer;
    +		}
    +
    +		/**
    +		 * Sets the query that is to be executed for every record.
    +		 *
    +		 * @param query query to use
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setQuery(String query) {
    +			this.query = query;
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the cassandra host to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(String host) {
    +			return setHost(host, 9042);
    +		}
    +
    +		/**
    +		 * Sets the cassandra host/port to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @param port port to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
    +			if (builder != null) {
    +				throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
    +			}
    +			builder = new ClusterBuilder() {
    +				@Override
    +				protected Cluster buildCluster(Cluster.Builder builder) {
    +					return builder.addContactPoint(host).withPort(port).build();
    +				}
    +			};
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
    +		 *
    +		 * @param builder ClusterBuilder to configure the connection to cassandra
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
    +			if (builder != null) {
    --- End diff --
    
    yes. i distinctly recall having fixed this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by theomega <gi...@git.apache.org>.
Github user theomega commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    I tried out this branch and it works like it should in the scenario I set up:
    I wrote (so only using the Sink) to a quiet complex columnfamily in a 8 node cassandra cluster. I was using a complex setup of windowed streams and all the data appeared and was perfectly readable as expected. I also had multiple sinks at the same time which also worked perfectly. I could not test the scenario @rmetzger is mentioning.
    
    Overall, I agree with @rmetzger that it should be considered to merge this to get more users to test it and report issues. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58568963
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,313 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +import java.util.UUID;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
    --- End diff --
    
    The problem is that there will be two `jobID`s which can lead to confusions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-217171304
  
    I just tried the PR, but the recovery after a failure doesn't seem to work:
    
    ```
    java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier
    	at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
    	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
    	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
    	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Failed to fetch state handle size
    	at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:234)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:511)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
    	... 8 more
    Caused by: java.io.FileNotFoundException: File does not exist: hdfs://nameservice1/user/robert/cassandra-fs/e70d0b78b7875877f42a8ebfba463f14/chk-0/9f892bc0-b5e2-484f-a981-6e666e7ad897
    	at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
    	at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
    	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
    	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    	at org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
    	at org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
    	at org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:428)
    	at org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink$ExactlyOnceState.getStateSize(GenericAtLeastOnceSink.java:190)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:81)
    	at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:231)
    	... 10 more
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58556061
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,313 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +import java.util.UUID;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
    --- End diff --
    
    `jobID` is used within Flink a lot and has a different meaning there. Maybe it make sense to rename this to "sink id".
    
    Why is this field static? This can lead to problems when a user is using two cassandra sinks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by f1yegor <gi...@git.apache.org>.
Github user f1yegor commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    Could somebody point to a documentation how we could manage state (pool of connections) when Sink works inside single JVM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63856510
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		if (builder == null) {
    +			throw new RuntimeException("No ClusterBuilder was set.");
    +		}
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +
    +		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", keySpace, table, operatorId, subtaskId));
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		session.executeAsync(deleteStatement.bind());
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    --- End diff --
    
    because some random exception caused by session.close() (like an NPE) shouldn't cause other exceptions to be hidden. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63678205
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    +						Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
    +						DataInputView in = handle.f1.getState(getUserCodeClassloader());
    +						sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
    +						committer.commitCheckpoint(pastCheckpointId);
    --- End diff --
    
    yes it is synchronous. should it fail an exception is thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r56823697
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,197 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    --- End diff --
    
    GenericExactlyOnceSink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63854905
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    --- End diff --
    
    Why not propagating the exception here and simply logging it? Shouldn't the whole thing fail if we cannot create the necessary resources? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    You are correct that every CassandraSink instance opens a separate connection to the cluster. I don't see how we could avoid this without using Singletons, which should be avoided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63675917
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		if (builder == null) {
    +			throw new RuntimeException("No ClusterBuilder was set.");
    +		}
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +
    +		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", keySpace, table, operatorId, subtaskId));
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		session.executeAsync(deleteStatement.bind());
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void commitCheckpoint(long checkpointID) {
    +		session.execute(updateStatement.bind(checkpointID));
    --- End diff --
    
    What happens if the `execute` call fails? Can we check the `ResultSet` for the success of the operation? If the commit did not succeed, then I think we should throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63855468
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    --- End diff --
    
    Maybe we should do this for the `CassandraCommitter` implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r56825863
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,197 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    --- End diff --
    
    Ok, I misunderstood CassandraIdempotentExactlyOnceSink's inheritance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58562221
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.batch.connectors.cassandra.example;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Cluster.Builder;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
    +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
    +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
    +
    +import java.util.ArrayList;
    +
    +public class BatchExample {
    +	private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);";
    +	private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;";
    +
    +	/*
    +	 *	table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));"
    +	 */
    --- End diff --
    
    I would remove this comment and move the query to the class javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-199298767
  
    Just pushed the following changes:
    * Fixed AT_LEAST_ONCE type
    * CassandraInputFormat.close properly propagates exceptions in close()
    * cassandra pom now uses guava.version attribute
    * fixed NotSerializableException For CassandraAtLeastOnceTuple-/PojoSink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58561402
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -0,0 +1,136 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.BoundStatement;
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.google.common.util.concurrent.FutureCallback;
    +import com.google.common.util.concurrent.Futures;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a 
    + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra
    + * if a checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericAtLeastOnceSink<IN> {
    +	protected transient Cluster cluster;
    +	protected transient Session session;
    +
    +	private final String insertQuery;
    +	private transient PreparedStatement preparedStatement;
    +
    +	private transient Throwable exception = null;
    +	private transient FutureCallback<ResultSet> callback;
    +
    +	private ClusterBuilder builder;
    +
    +	private int updatesSent = 0;
    +	private AtomicInteger updatesConfirmed = new AtomicInteger(0);
    +
    +	private transient Object[] fields;
    +
    +	protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception {
    +		super(committer, serializer, jobID);
    +		this.insertQuery = insertQuery;
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public void open() throws Exception {
    +		super.open();
    +		if (!getRuntimeContext().isCheckpointingEnabled()) {
    +			throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
    +		}
    +		this.callback = new FutureCallback<ResultSet>() {
    +			@Override
    +			public void onSuccess(ResultSet resultSet) {
    +				updatesConfirmed.incrementAndGet();
    +			}
    +
    +			@Override
    +			public void onFailure(Throwable throwable) {
    +				exception = throwable;
    --- End diff --
    
    Error logging here as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r59004103
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,126 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private static final String KEYSPACE = "flink_auxiliary";
    +	private String TABLE = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		TABLE += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r64217845
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    --- End diff --
    
    I've implemented this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-194294832
  
    Great work :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-206309053
  
    @rmetzger addressed all your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-217159043
  
    Hi guys :) any update on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-199229037
  
    @alkagin Thank you for reporting this, will look into it. I think i may already know how to fix it...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58556585
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,313 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +import java.util.UUID;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
    +	private final boolean useDataStreamSink;
    +	private DataStreamSink<IN> sink1;
    +	private SingleOutputStreamOperator<IN> sink2;
    +
    +	private CassandraSink(DataStreamSink<IN> sink) {
    +		sink1 = sink;
    +		useDataStreamSink = true;
    +	}
    +
    +	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
    +		sink2 = sink;
    +		useDataStreamSink = false;
    +	}
    +
    +	private SinkTransformation<IN> getSinkTransformation() {
    +		return sink1.getTransformation();
    +	}
    +
    +	private StreamTransformation<IN> getStreamTransformation() {
    +		return sink2.getTransformation();
    +	}
    +
    +	/**
    +	 * Sets the name of this sink. This name is
    +	 * used by the visualization and logging during runtime.
    +	 *
    +	 * @return The named sink.
    +	 */
    +	public CassandraSink<IN> name(String name) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setName(name);
    +		} else {
    +			getStreamTransformation().setName(name);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets an ID for this operator.
    +	 * <p/>
    +	 * <p>The specified ID is used to assign the same operator ID across job
    +	 * submissions (for example when starting a job from a savepoint).
    +	 * <p/>
    +	 * <p><strong>Important</strong>: this ID needs to be unique per
    +	 * transformation and job. Otherwise, job submission will fail.
    +	 *
    +	 * @param uid The unique user-specified ID of this transformation.
    +	 * @return The operator with the specified ID.
    +	 */
    +	public CassandraSink<IN> uid(String uid) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setUid(uid);
    +		} else {
    +			getStreamTransformation().setUid(uid);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the parallelism for this sink. The degree must be higher than zero.
    +	 *
    +	 * @param parallelism The parallelism for this sink.
    +	 * @return The sink with set parallelism.
    +	 */
    +	public CassandraSink<IN> setParallelism(int parallelism) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setParallelism(parallelism);
    +		} else {
    +			getStreamTransformation().setParallelism(parallelism);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Turns off chaining for this operator so thread co-location will not be
    +	 * used as an optimization.
    +	 * <p/>
    +	 * <p/>
    +	 * Chaining can be turned off for the whole
    +	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
    +	 * however it is not advised for performance considerations.
    +	 *
    +	 * @return The sink with chaining disabled
    +	 */
    +	public CassandraSink<IN> disableChaining() {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		} else {
    +			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the slot sharing group of this operation. Parallel instances of
    +	 * operations that are in the same slot sharing group will be co-located in the same
    +	 * TaskManager slot, if possible.
    +	 * <p/>
    +	 * <p>Operations inherit the slot sharing group of input operations if all input operations
    +	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
    +	 * <p/>
    +	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
    +	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
    +	 *
    +	 * @param slotSharingGroup The slot sharing group name.
    +	 */
    +	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
    +		} else {
    +			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Writes a DataStream into a Cassandra database.
    +	 *
    +	 * @param input input DataStream
    +	 * @param <IN>  input type
    +	 * @return CassandraSinkBuilder, to further configure the sink
    +	 */
    +	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
    +		if (input.getType() instanceof TupleTypeInfo) {
    +			DataStream<T> tupleInput = (DataStream<T>) input;
    +			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
    +		} else {
    +			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
    +		}
    +	}
    +
    +	public abstract static class CassandraSinkBuilder<IN> {
    +		protected final DataStream<IN> input;
    +		protected final TypeSerializer<IN> serializer;
    +		protected final TypeInformation<IN> typeInfo;
    +		protected ClusterBuilder builder;
    +		protected String query;
    +		protected CheckpointCommitter committer;
    +		protected boolean isWriteAheadLogEnabled;
    +
    +		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
    +			this.input = input;
    +			this.typeInfo = typeInfo;
    +			this.serializer = serializer;
    +		}
    +
    +		/**
    +		 * Sets the query that is to be executed for every record.
    +		 *
    +		 * @param query query to use
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setQuery(String query) {
    +			this.query = query;
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the cassandra host to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(String host) {
    +			return setHost(host, 9042);
    +		}
    +
    +		/**
    +		 * Sets the cassandra host/port to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @param port port to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
    +			builder = new ClusterBuilder() {
    --- End diff --
    
    I wonder whether we should throw an exception if the builder is already set. Otherwise, if somebody first sets a builder and then a host the results will be very misleading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58548581
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java ---
    @@ -0,0 +1,123 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.batch.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.google.common.base.Strings;
    +import com.google.common.util.concurrent.FutureCallback;
    +import com.google.common.util.concurrent.Futures;
    +import org.apache.flink.api.common.io.RichOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra.
    + *
    + * @param <OUT> type of Tuple
    + */
    +public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
    +
    +	private final String insertQuery;
    +	private final ClusterBuilder builder;
    +
    +	private transient Cluster cluster;
    +	private transient Session session;
    +	private transient PreparedStatement prepared;
    +	private transient FutureCallback<ResultSet> callback;
    +	private transient Throwable exception = null;
    +
    +	public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
    +		if (Strings.isNullOrEmpty(insertQuery)) {
    +			throw new IllegalArgumentException("insertQuery cannot be null or empty");
    +		}
    +		if (builder == null) {
    +			throw new IllegalArgumentException("Builder cannot be null.");
    +		}
    +		this.insertQuery = insertQuery;
    +		this.builder = builder;
    +	}
    +
    +	@Override
    +	public void configure(Configuration parameters) {
    +		this.cluster = builder.getCluster();
    +	}
    +
    +	/**
    +	 * Opens a Session to Cassandra and initializes the prepared statement.
    +	 *
    +	 * @param taskNumber The number of the parallel instance.
    +	 * @throws IOException Thrown, if the output could not be opened due to an
    +	 *                     I/O problem.
    +	 */
    +	@Override
    +	public void open(int taskNumber, int numTasks) throws IOException {
    +		this.session = cluster.connect();
    +		this.prepared = session.prepare(insertQuery);
    +		this.callback = new FutureCallback<ResultSet>() {
    +			@Override
    +			public void onSuccess(ResultSet ignored) {
    +			}
    +
    +			@Override
    +			public void onFailure(Throwable t) {
    +				exception = t;
    +			}
    +		};
    +	}
    +
    +	@Override
    +	public void writeRecord(OUT record) throws IOException {
    +		if (exception != null) {
    +			throw new IOException("write record failed", exception);
    +		}
    +
    +		Object[] fields = new Object[record.getArity()];
    +		for (int i = 0; i < record.getArity(); i++) {
    +			fields[i] = record.getField(i);
    +		}
    +		ResultSetFuture result = session.executeAsync(prepared.bind(fields));
    +		Futures.addCallback(result, callback);
    +	}
    +
    +	/**
    +	 * Closes all resources used.
    +	 */
    +	@Override
    +	public void close() throws IOException {
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
    --- End diff --
    
    I would make this a WARN and forward the cause to the log entry


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63675532
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    --- End diff --
    
    Does this always trigger a lookup? Wouldn't it be more efficient to fetch the last committed checkpoint id and then use this value for the comparison?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by f1yegor <gi...@git.apache.org>.
Github user f1yegor commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    Looking at the code of `CassandraSinkBase` I assume that every new sink will create own session to the cluster. 
    I'm creating arount 1000 streams and sinks for them, so cassandra cluster would be saturated with openning connections.
    Cassandra session could manage several connections via one session. 
    
    Could you confirm that this isn't an issue with the implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58571353
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra.example;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
    +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
    +
    +import java.util.UUID;
    +
    +public class CassandraTupleWriteAheadSinkExample {
    +	public static void main(String[] args) throws Exception {
    +
    +		class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
    --- End diff --
    
    I would move the class out of the main method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-220975120
  
    I finally was able to fix the restart issue. there were 2 massive bugs in the CassandraCommitter:
    - within open() the checkpoint entry was always overridden
    - within close() the checkpoint entry was always deleted
    
    in addition i have made the following changes:
    - renamed GenericAtLeastOnceSink to GenericWriteAheadSink
    - implemented a caching of the last committed checkpointID in the CassandraCommitter
    
    It's rather obvious that more tests are required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63855104
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    +		}
    +		try {
    +			cluster.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing cluster.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		if (builder == null) {
    +			throw new RuntimeException("No ClusterBuilder was set.");
    +		}
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
    +
    +		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", keySpace, table, operatorId, subtaskId));
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		session.executeAsync(deleteStatement.bind());
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    --- End diff --
    
    Why swallowing the exception here? Shouldn't it be propagated to the executing task?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63673048
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    +						Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
    +						DataInputView in = handle.f1.getState(getUserCodeClassloader());
    +						sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
    +						committer.commitCheckpoint(pastCheckpointId);
    +					}
    +					checkpointsToRemove.add(pastCheckpointId);
    +				}
    +			}
    +			for (Long toRemove : checkpointsToRemove) {
    +				Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(toRemove);
    +				state.pendingHandles.remove(toRemove);
    +				handle.f1.discardState();
    +			}
    +		}
    +	}
    +
    +
    +	/**
    +	 * Write the given element into the backend.
    +	 *
    +	 * @param value value to be written
    +	 * @throws Exception
    +	 */
    +	protected abstract void sendValues(Iterable<IN> value, long timestamp) throws Exception;
    +
    +	@Override
    +	public void processElement(StreamRecord<IN> element) throws Exception {
    +		IN value = element.getValue();
    +		//generate initial operator state
    +		if (out == null) {
    +			out = getStateBackend().createCheckpointStateOutputView(0, 0);
    +		}
    +		serializer.serialize(value, out);
    +	}
    +
    +	@Override
    +	public void processWatermark(Watermark mark) throws Exception {
    +		//don't do anything, since we are a sink
    +	}
    +
    +	/**
    +	 * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were
    +	 * used since the last completed checkpoint.
    +	 **/
    +	public class ExactlyOnceState implements StateHandle<Serializable> {
    --- End diff --
    
    Serial version uid missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58562121
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.batch.connectors.cassandra.example;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Cluster.Builder;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
    +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
    +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
    +
    +import java.util.ArrayList;
    +
    +public class BatchExample {
    --- End diff --
    
    Javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58562019
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java ---
    @@ -0,0 +1,31 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +
    +import java.io.Serializable;
    +
    +public abstract class ClusterBuilder implements Serializable {
    --- End diff --
    
    Since this is a user facing class, I would add some javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58562758
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,313 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +import java.util.UUID;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
    --- End diff --
    
    why should it be named sinkID when it is supposed to be a job-specific ID?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-199224407
  
    Hi, 
    I am testing the Cassandra connector and I may have found a bug using CassandraSink-DSL. During the execution it throws `java.io.NotSerializableException: org.apache.flink.streaming.connectors.cassandra.CassandraSink$CassandraTupleSinkBuilder` CassandraSinkBuilder doesn't implement java.io.Serializable. I forked and added java.io.serializable to CassandraSinkBuilder, after that it throws `java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator`.
    Not using the DSL it works as expected. 
    Job source code: https://gist.github.com/alkagin/6620ebdd10ef6bb47eee
    
    Hope it helps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    Thank you for testing the cassandra connector.
    
    I'll merge the pull request now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1771#issuecomment-217160848
  
    I'll test the PR again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63675682
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.runtime.io.disk.InputViewIterator;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.UUID;
    +
    +/**
    + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
    + * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
    + * <p/>
    + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
    + * checkpoint is completed.
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
    +	private final CheckpointCommitter committer;
    +	private transient AbstractStateBackend.CheckpointStateOutputView out;
    +	protected final TypeSerializer<IN> serializer;
    +	private final String id;
    +
    +	private ExactlyOnceState state = new ExactlyOnceState();
    +
    +	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
    +		this.committer = committer;
    +		this.serializer = serializer;
    +		this.id = UUID.randomUUID().toString();
    +		this.committer.setJobId(jobID);
    +		this.committer.createResource();
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		committer.setOperatorId(id);
    +		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
    +		committer.open();
    +	}
    +
    +	public void close() throws Exception {
    +		committer.close();
    +	}
    +
    +	/**
    +	 * Saves a handle in the state.
    +	 *
    +	 * @param checkpointId
    +	 * @throws IOException
    +	 */
    +	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
    +		//only add handle if a new OperatorState was created since the last snapshot
    +		if (out != null) {
    +			StateHandle<DataInputView> handle = out.closeAndGetHandle();
    +			if (state.pendingHandles.containsKey(checkpointId)) {
    +				//we already have a checkpoint stored for that ID that may have been partially written,
    +				//so we discard this "alternate version" and use the stored checkpoint
    +				handle.discardState();
    +			} else {
    +				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
    +			}
    +			out = null;
    +		}
    +	}
    +
    +	@Override
    +	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
    +		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    +		saveHandleInState(checkpointId, timestamp);
    +		taskState.setFunctionState(state);
    +		return taskState;
    +	}
    +
    +	@Override
    +	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
    +		super.restoreState(state, recoveryTimestamp);
    +		this.state = (ExactlyOnceState) state.getFunctionState();
    +
    +		out = null;
    +	}
    +
    +	@Override
    +	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    +		super.notifyOfCompletedCheckpoint(checkpointId);
    +
    +		synchronized (state.pendingHandles) {
    +			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
    +			Set<Long> checkpointsToRemove = new HashSet<>();
    +			for (Long pastCheckpointId : pastCheckpointIds) {
    +				if (pastCheckpointId <= checkpointId) {
    +					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
    +						Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
    +						DataInputView in = handle.f1.getState(getUserCodeClassloader());
    +						sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
    +						committer.commitCheckpoint(pastCheckpointId);
    --- End diff --
    
    Can it be that committing the checkpoint id can fail (with cassandra)? Is the commit checkpoint operation synchronous (CassandraCommitter)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1771


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    I've reviewed the connector again.
    The issues I've seen previously (failure on restart) are resolved.
    However, I found new issues:
    - The Cassandra Sink doesn't fail (at least not within 15 minutes) if Cassandra is not available anymore. Its probably just a configuration setting of the cassandra driver to fail after a certain amount of time.
    - We should probably introduce a (configurable) limit (nr. records / some gb's) for the write ahead log. It seemed to me, that due to the failed other instance, no checkpoints were able to complete anymore (because some of the cassandra sinks were stuck in the notifyCheckpointComplete()), while other's were accepting data into the WAL. This lead to a lot of data being written into the statebackend. I think the cassandra sink should stop at some point in such a situation.
    
    Also, I would like to test the exactly once behavior on a cluster more thoroughly. Currently, I've only tested whether the connector is properly failing and restoring, but I didn't test if the written data is actually correct.
    
    However, since the code seems to be working under normal operation, I would suggest to merge the connector now, and then file follow up JIRAs for the remaining issues.
    This makes collaboration and reviews easier and allows our users to help testing the cassandra connector.
    
    
    
    Some log:
    ```
    2016-06-03 12:28:36,478 ERROR org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink  - Error while sending value.
    com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
    	at com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128)
    	at com.datastax.driver.core.Responses$Error.asException(Responses.java:114)
    	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
    	at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
    	at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
    	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
    	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
    	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
    	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
    	at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
    	at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
    	at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
    	at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
    	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
    	... 11 more
    2016-06-03 12:28:57,473 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
    2016-06-03 12:28:57,487 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
    2016-06-03 12:29:02,939 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
    2016-06-03 12:29:02,970 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
    2016-06-03 12:29:12,945 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
    2016-06-03 12:29:12,974 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
    2016-06-03 12:29:17,947 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 8000 milliseconds
    2016-06-03 12:29:17,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 8000 milliseconds
    2016-06-03 12:29:28,481 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 16000 milliseconds
    2016-06-03 12:29:28,974 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 16000 milliseconds
    2016-06-03 12:29:44,482 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 32000 milliseconds
    2016-06-03 12:29:44,975 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 32000 milliseconds
    2016-06-03 12:30:16,482 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 64000 milliseconds
    2016-06-03 12:30:16,975 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 64000 milliseconds
    2016-06-03 12:31:20,483 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 128000 milliseconds
    2016-06-03 12:31:20,976 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 128000 milliseconds
    2016-06-03 12:33:28,484 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 256000 milliseconds
    2016-06-03 12:33:28,976 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 256000 milliseconds
    2016-06-03 12:37:44,484 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 512000 milliseconds
    2016-06-03 12:37:44,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 512000 milliseconds
    2016-06-03 12:46:16,485 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 600000 milliseconds
    2016-06-03 12:46:16,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 600000 milliseconds
    2016-06-03 12:46:54,906 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: Custom Source (1/2)
    2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/2) switched to CANCELING
    2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source (1/2) (dec8d24e486ca9937739b7c6e07fbb05).
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Cassandra Sink (1/2)
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Cassandra Sink (1/2) switched to CANCELING
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Cassandra Sink (1/2) (96511ef6293a893b0ef35dd211aea2b9).
    2016-06-03 12:46:55,389 INFO  com.dataartisans.Job                                          - Received cancel in EventGenerator
    2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/2) switched to CANCELED
    2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/2)
    2016-06-03 12:46:55,394 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (dec8d24e486ca9937739b7c6e07fbb05)
    2016-06-03 12:47:24,911 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:47:54,912 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:48:24,913 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:48:54,915 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:49:24,916 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:49:54,918 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:50:24,919 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r56807419
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,282 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +import java.util.UUID;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
    +	private final boolean useDataStreamSink;
    +	private DataStreamSink<IN> sink1;
    +	private SingleOutputStreamOperator<IN> sink2;
    +
    +	private CassandraSink(DataStreamSink<IN> sink) {
    +		sink1 = sink;
    +		useDataStreamSink = true;
    +	}
    +
    +	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
    +		sink2 = sink;
    +		useDataStreamSink = false;
    +	}
    +
    +	private SinkTransformation<IN> getSinkTransformation() {
    +		return sink1.getTransformation();
    +	}
    +
    +	private StreamTransformation<IN> getStreamTransformation() {
    +		return sink2.getTransformation();
    +	}
    +
    +	/**
    +	 * Sets the name of this sink. This name is
    +	 * used by the visualization and logging during runtime.
    +	 *
    +	 * @return The named sink.
    +	 */
    +	public CassandraSink<IN> name(String name) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setName(name);
    +		} else {
    +			getStreamTransformation().setName(name);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets an ID for this operator.
    +	 * <p/>
    +	 * <p>The specified ID is used to assign the same operator ID across job
    +	 * submissions (for example when starting a job from a savepoint).
    +	 * <p/>
    +	 * <p><strong>Important</strong>: this ID needs to be unique per
    +	 * transformation and job. Otherwise, job submission will fail.
    +	 *
    +	 * @param uid The unique user-specified ID of this transformation.
    +	 * @return The operator with the specified ID.
    +	 */
    +	public CassandraSink<IN> uid(String uid) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setUid(uid);
    +		} else {
    +			getStreamTransformation().setUid(uid);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the parallelism for this sink. The degree must be higher than zero.
    +	 *
    +	 * @param parallelism The parallelism for this sink.
    +	 * @return The sink with set parallelism.
    +	 */
    +	public CassandraSink<IN> setParallelism(int parallelism) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setParallelism(parallelism);
    +		} else {
    +			getStreamTransformation().setParallelism(parallelism);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Turns off chaining for this operator so thread co-location will not be
    +	 * used as an optimization.
    +	 * <p/>
    +	 * <p/>
    +	 * Chaining can be turned off for the whole
    +	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
    +	 * however it is not advised for performance considerations.
    +	 *
    +	 * @return The sink with chaining disabled
    +	 */
    +	public CassandraSink<IN> disableChaining() {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		} else {
    +			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the slot sharing group of this operation. Parallel instances of
    +	 * operations that are in the same slot sharing group will be co-located in the same
    +	 * TaskManager slot, if possible.
    +	 * <p/>
    +	 * <p>Operations inherit the slot sharing group of input operations if all input operations
    +	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
    +	 * <p/>
    +	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
    +	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
    +	 *
    +	 * @param slotSharingGroup The slot sharing group name.
    +	 */
    +	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
    +		} else {
    +			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Writes a DataStream into a Cassandra database.
    +	 *
    +	 * @param input input DataStream
    +	 * @param <IN>  input type
    +	 * @return CassandraSinkBuilder, to further configure the sink
    +	 */
    +	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
    +		if (input.getType() instanceof TupleTypeInfo) {
    +			DataStream<T> tupleInput = (DataStream<T>) input;
    +			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
    +		} else {
    +			throw new IllegalArgumentException("POJOs are currently not supported.");
    +		}
    +	}
    +
    +	public enum ConsistencyLevel {
    +		At_LEAST_ONCE,
    --- End diff --
    
    typo: AT_LEAST_ONCE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58675302
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml ---
    @@ -0,0 +1,43 @@
    +################################################################################
    +#  Licensed to the Apache Software Foundation (ASF) under one
    +#  or more contributor license agreements.  See the NOTICE file
    +#  distributed with this work for additional information
    +#  regarding copyright ownership.  The ASF licenses this file
    +#  to you under the Apache License, Version 2.0 (the
    +#  "License"); you may not use this file except in compliance
    +#  with the License.  You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +#  Unless required by applicable law or agreed to in writing, software
    +#  distributed under the License is distributed on an "AS IS" BASIS,
    +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +#  See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +cluster_name: 'Test Cluster'
    +commitlog_sync: 'periodic'
    +commitlog_sync_period_in_ms: 10000
    +commitlog_segment_size_in_mb: 16
    +partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
    +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
    +commitlog_directory: $PATH\commit'
    +data_file_directories:
    +    - $PATH\data'
    +saved_caches_directory: $PATH\cache'
    --- End diff --
    
    the simple explanation would be that the EmbeddedCassandraService can resolve the path properly regardless of platform.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r58572834
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml ---
    @@ -0,0 +1,43 @@
    +################################################################################
    +#  Licensed to the Apache Software Foundation (ASF) under one
    +#  or more contributor license agreements.  See the NOTICE file
    +#  distributed with this work for additional information
    +#  regarding copyright ownership.  The ASF licenses this file
    +#  to you under the Apache License, Version 2.0 (the
    +#  "License"); you may not use this file except in compliance
    +#  with the License.  You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +#  Unless required by applicable law or agreed to in writing, software
    +#  distributed under the License is distributed on an "AS IS" BASIS,
    +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +#  See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +cluster_name: 'Test Cluster'
    +commitlog_sync: 'periodic'
    +commitlog_sync_period_in_ms: 10000
    +commitlog_segment_size_in_mb: 16
    +partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
    +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
    +commitlog_directory: $PATH\commit'
    +data_file_directories:
    +    - $PATH\data'
    +saved_caches_directory: $PATH\cache'
    --- End diff --
    
    I wonder why the path works on unix platforms. Only windows is using `\` as a separator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1771: [FLINK-3311/FLINK-3332] Add Cassandra connector

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    The merging will probably be a bit delayed because there are some memory issues:
    
    ```
    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
    06/13/2016 11:12:22	Job execution switched to status RUNNING.
    06/13/2016 11:12:22	Source: Collection Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
    06/13/2016 11:12:22	Source: Collection Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
    06/13/2016 11:12:23	Source: Collection Source -> Sink: Unnamed(1/1) switched to RUNNING 
    06/13/2016 11:12:27	Source: Collection Source -> Sink: Unnamed(1/1) switched to FINISHED 
    06/13/2016 11:12:27	Job execution switched to status FINISHED.
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:2"
    Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 150.02 sec - in org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:5"
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:4"
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "HintedHandoffManager:1"
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:7"
    ==============================================================================
    Maven produced no output for 300 seconds.
    ==============================================================================
    ==============================================================================
    The following Java processes are running (JPS)
    ==============================================================================
    2956 Launcher
    98454 Jps
    97148 surefirebooter7601064672285446339.jar
    ==============================================================================
    Printing stack trace of Java process 2956
    ==============================================================================
    2016-06-13 11:28:23
    Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):
    ```
    
    I will take care of them!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by alkagin <gi...@git.apache.org>.
Github user alkagin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r61419752
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---
    @@ -0,0 +1,316 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.transformations.SinkTransformation;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
    + *
    + * @param <IN> input type
    + */
    +public class CassandraSink<IN> {
    +	private final boolean useDataStreamSink;
    +	private DataStreamSink<IN> sink1;
    +	private SingleOutputStreamOperator<IN> sink2;
    +
    +	private CassandraSink(DataStreamSink<IN> sink) {
    +		sink1 = sink;
    +		useDataStreamSink = true;
    +	}
    +
    +	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
    +		sink2 = sink;
    +		useDataStreamSink = false;
    +	}
    +
    +	private SinkTransformation<IN> getSinkTransformation() {
    +		return sink1.getTransformation();
    +	}
    +
    +	private StreamTransformation<IN> getStreamTransformation() {
    +		return sink2.getTransformation();
    +	}
    +
    +	/**
    +	 * Sets the name of this sink. This name is
    +	 * used by the visualization and logging during runtime.
    +	 *
    +	 * @return The named sink.
    +	 */
    +	public CassandraSink<IN> name(String name) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setName(name);
    +		} else {
    +			getStreamTransformation().setName(name);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets an ID for this operator.
    +	 * <p/>
    +	 * <p>The specified ID is used to assign the same operator ID across job
    +	 * submissions (for example when starting a job from a savepoint).
    +	 * <p/>
    +	 * <p><strong>Important</strong>: this ID needs to be unique per
    +	 * transformation and job. Otherwise, job submission will fail.
    +	 *
    +	 * @param uid The unique user-specified ID of this transformation.
    +	 * @return The operator with the specified ID.
    +	 */
    +	public CassandraSink<IN> uid(String uid) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setUid(uid);
    +		} else {
    +			getStreamTransformation().setUid(uid);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the parallelism for this sink. The degree must be higher than zero.
    +	 *
    +	 * @param parallelism The parallelism for this sink.
    +	 * @return The sink with set parallelism.
    +	 */
    +	public CassandraSink<IN> setParallelism(int parallelism) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setParallelism(parallelism);
    +		} else {
    +			getStreamTransformation().setParallelism(parallelism);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Turns off chaining for this operator so thread co-location will not be
    +	 * used as an optimization.
    +	 * <p/>
    +	 * <p/>
    +	 * Chaining can be turned off for the whole
    +	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
    +	 * however it is not advised for performance considerations.
    +	 *
    +	 * @return The sink with chaining disabled
    +	 */
    +	public CassandraSink<IN> disableChaining() {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		} else {
    +			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the slot sharing group of this operation. Parallel instances of
    +	 * operations that are in the same slot sharing group will be co-located in the same
    +	 * TaskManager slot, if possible.
    +	 * <p/>
    +	 * <p>Operations inherit the slot sharing group of input operations if all input operations
    +	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
    +	 * <p/>
    +	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
    +	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
    +	 *
    +	 * @param slotSharingGroup The slot sharing group name.
    +	 */
    +	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
    +		if (useDataStreamSink) {
    +			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
    +		} else {
    +			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Writes a DataStream into a Cassandra database.
    +	 *
    +	 * @param input input DataStream
    +	 * @param <IN>  input type
    +	 * @return CassandraSinkBuilder, to further configure the sink
    +	 */
    +	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
    +		if (input.getType() instanceof TupleTypeInfo) {
    +			DataStream<T> tupleInput = (DataStream<T>) input;
    +			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
    +		} else {
    +			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
    +		}
    +	}
    +
    +	public abstract static class CassandraSinkBuilder<IN> {
    +		protected final DataStream<IN> input;
    +		protected final TypeSerializer<IN> serializer;
    +		protected final TypeInformation<IN> typeInfo;
    +		protected ClusterBuilder builder;
    +		protected String query;
    +		protected CheckpointCommitter committer;
    +		protected boolean isWriteAheadLogEnabled;
    +
    +		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
    +			this.input = input;
    +			this.typeInfo = typeInfo;
    +			this.serializer = serializer;
    +		}
    +
    +		/**
    +		 * Sets the query that is to be executed for every record.
    +		 *
    +		 * @param query query to use
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setQuery(String query) {
    +			this.query = query;
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the cassandra host to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(String host) {
    +			return setHost(host, 9042);
    +		}
    +
    +		/**
    +		 * Sets the cassandra host/port to connect to.
    +		 *
    +		 * @param host host to connect to
    +		 * @param port port to connect to
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
    +			if (builder != null) {
    +				throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
    +			}
    +			builder = new ClusterBuilder() {
    +				@Override
    +				protected Cluster buildCluster(Cluster.Builder builder) {
    +					return builder.addContactPoint(host).withPort(port).build();
    +				}
    +			};
    +			return this;
    +		}
    +
    +		/**
    +		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
    +		 *
    +		 * @param builder ClusterBuilder to configure the connection to cassandra
    +		 * @return this builder
    +		 */
    +		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
    +			if (builder != null) {
    --- End diff --
    
    `this.builder`instead of `builder`, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1771#discussion_r63871737
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PreparedStatement;
    +import com.datastax.driver.core.Session;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +
    +/**
    + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
    + * database.
    + * <p/>
    + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
    + */
    +public class CassandraCommitter extends CheckpointCommitter {
    +	private ClusterBuilder builder;
    +	private transient Cluster cluster;
    +	private transient Session session;
    +
    +	private String keySpace = "flink_auxiliary";
    +	private String table = "checkpoints_";
    +
    +	private transient PreparedStatement deleteStatement;
    +	private transient PreparedStatement updateStatement;
    +	private transient PreparedStatement selectStatement;
    +
    +	public CassandraCommitter(ClusterBuilder builder) {
    +		this.builder = builder;
    +		ClosureCleaner.clean(builder, true);
    +	}
    +
    +	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
    +		this(builder);
    +		this.keySpace = keySpace;
    +	}
    +
    +	/**
    +	 * Internally used to set the job ID after instantiation.
    +	 *
    +	 * @param id
    +	 * @throws Exception
    +	 */
    +	public void setJobId(String id) throws Exception {
    +		super.setJobId(id);
    +		table += id;
    +	}
    +
    +	/**
    +	 * Generates the necessary tables to store information.
    +	 *
    +	 * @return
    +	 * @throws Exception
    +	 */
    +	@Override
    +	public void createResource() throws Exception {
    +		cluster = builder.getCluster();
    +		session = cluster.connect();
    +
    +		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace));
    +		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
    +
    +		try {
    +			session.close();
    +		} catch (Exception e) {
    +			LOG.error("Error while closing session.", e);
    --- End diff --
    
    correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---