You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/22 05:18:04 UTC
[2/2] apex-malhar git commit: APEXMALHAR-2181 Cassandra Upsert
Operator
APEXMALHAR-2181 Cassandra Upsert Operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/664257b4
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/664257b4
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/664257b4
Branch: refs/heads/master
Commit: 664257b4a90d200137c4254428aa806dcd55c92d
Parents: 05a7ca3
Author: ananthc <an...@gmail.com>
Authored: Tue Nov 22 07:26:12 2016 +1100
Committer: ananthc <an...@gmail.com>
Committed: Tue Nov 22 07:26:12 2016 +1100
----------------------------------------------------------------------
contrib/pom.xml | 8 +-
.../cassandra/AbstractUpsertOutputOperator.java | 1012 ++++++++++++++++++
.../cassandra/CassandraPOJOInputOperator.java | 2 +-
.../cassandra/CassandraPOJOOutputOperator.java | 2 +-
.../contrib/cassandra/CassandraPojoUtils.java | 316 ++++++
.../CassandraPreparedStatementGenerator.java | 287 +++++
.../cassandra/ConnectionStateManager.java | 477 +++++++++
.../cassandra/UpsertExecutionContext.java | 210 ++++
.../AbstractUpsertOutputOperatorCodecsTest.java | 476 ++++++++
...ractUpsertOutputOperatorCompositePKTest.java | 99 ++
...bstractUpsertOutputOperatorCountersTest.java | 120 +++
.../datatorrent/contrib/cassandra/Address.java | 82 ++
.../contrib/cassandra/AddressCodec.java | 85 ++
.../cassandra/CassandraOperatorTest.java | 47 +-
.../cassandra/CompositePrimaryKeyRow.java | 98 ++
.../CompositePrimaryKeyUpdateOperator.java | 58 +
.../cassandra/CounterColumnTableEntry.java | 47 +
.../cassandra/CounterColumnUpdatesOperator.java | 54 +
.../datatorrent/contrib/cassandra/FullName.java | 52 +
.../contrib/cassandra/FullNameCodec.java | 80 ++
.../com/datatorrent/contrib/cassandra/User.java | 130 +++
.../contrib/cassandra/UserUpsertOperator.java | 97 ++
22 files changed, 3817 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 84a7e05..566d03d 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -487,13 +487,13 @@
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
- <version>2.1.8</version>
+ <version>3.1.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>14.0.1</version>
+ <version>16.0.1</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
@@ -639,13 +639,13 @@
<artifactId>*</artifactId>
</exclusion>
</exclusions>
- </dependency>
+ </dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<optional>true</optional>
- </dependency>
+ </dependency>
<dependency>
<groupId>com.github.fge</groupId>
<artifactId>json-schema-validator</artifactId>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java
new file mode 100644
index 0000000..95f98fe
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java
@@ -0,0 +1,1012 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.codahale.metrics.Timer;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metrics;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TypeCodec;
+
+
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An abstract operator that is used to mutate cassandra rows using PreparedStatements for faster executions
+ * and accommodates EXACTLY_ONCE Semantics if concrete implementations choose to implement an abstract method with
+ * meaningful implementation (as Cassandra is not a pure transactional database , the burden is on the concrete
+ * implementation of the operator ONLY during the reconciliation window (and not for any other windows).
+ *
+ * The typical implementation model is as follows :
+ * 1. Create a concrete implementation of this class by extending this class and implement a few methods.
+ * 2. Define the payload that is the POJO that represents a Cassandra Row is part of this execution context
+ * {@link UpsertExecutionContext}. The payload is a template Parameter of this class
+ * 3. The Upstream operator that wants to write to Cassandra does the following
+ * a. Create an instance of {@link UpsertExecutionContext}
+ * b. Set the payload ( an instance of the POJO created as step two above )
+ * c. Set additional execution context parameters like CollectionHandling style, List placement Styles
+ * overriding TTLs, Update only if Primary keys exist and Consistency Levels etc.
+ * 4. The concrete implementation would then execute this context as a cassandra row mutation
+ *
+ * This operator supports the following features
+ * 1. Highly customizable Connection policies. This is achieved by specifying the ConnectionStateManager.
+ * There are quite a few connection management aspects that can be
+ * controlled via {@link ConnectionStateManager} like consistency, load balancing, connection retries,
+ * table to use, keyspace to use etc. Please refer javadoc of {@link ConnectionStateManager}
+ * 2. Support for Collections : Map, List and Sets are supported
+ * User Defined types as part of collections is also supported.
+ * 3. Support exists for both adding to an existing collection or removing entries from an existing collection.
+ * The POJO field that represents a collection is used to represent the collection that is added or removed.
+ * Thus this can be used to avoid a pattern of read and then write the final value into the cassandra column
+ * which can be used for low latency / high write pattern applications as we can avoid a read in the process.
+ * 4. Supports List Placements : The execution context can be used to specify where the new incoming list
+ * is to be added ( in case there is an existing list in the current column of the current row being mutated.
+ * Supported options are APPEND or PREPEND to an existing list
+ * 5. Support for User Defined Types. A pojo can have fields that represent the Cassandra Columns that are custom
+ * user defined types. Concrete implementations of the operator provide a mapping of the cassandra column name
+ * to the TypeCodec that is to be used for that field inside cassandra. Please refer javadocs of
+ * {@link this.getCodecsForUserDefinedTypes() } for more details
+ * 6. Support for custom mapping of POJO payload field names to that of cassandra columns. Practically speaking,
+ * POJO field names might not always match with Cassandra Column names and hence this support. This will also avoid
+ * writing a POJO just for the cassandra operator and thus an existing POJO can be passed around to this operator.
+ * Please refer javadoc {@link this.getPojoFieldNameToCassandraColumnNameOverride()} for an example
+ * 7. TTL support - A default TTL can be set for the Connection ( via {@link ConnectionStateManager} and then used
+ * for all mutations. This TTL can further be overridden at a tuple execution level to accomodate use cases of
+ * setting custom column expirations typically useful in wide row implementations.
+ * 8. Support for Counter Column tables. Counter tables are also supported with the values inside the incoming
+ * POJO added/subtracted from the counter column accordingly. Please note that the value is not absolute set but
+ * rather representing the value that needs to be added to or subtracted from the current counter.
+ * 9. Support for Composite Primary Keys is also supported. All the POJO fields that map to the composite
+ * primary key are used to resolve the primary key in case of a Composite Primary key table
+ * 10. Support for conditional updates : This operator can be used as an Update Only operator as opposed to an
+ * Upsert operator. i.e. Update only IF EXISTS . This is achieved by setting the appropriate boolean in the
+ * {@link UpsertExecutionContext} tuple that is passed from the upstream operator.
+ * 11. Lenient mapping of POJO fields to Cassandra column names. By default the POJO field names are case insensitive
+ * to cassandra column names. This can be further enhanced by over-riding mappings. Please refer feature 6 above.
+ * 12. Defaults can be overridden at at tuple execution level for TTL & Consistency Policies
+ * 13. Support for handling Nulls i.e. whether null values in the POJO are to be persisted as is or to be ignored so
+ * that the application need not perform a read to populate a POJO field if it is not available in the context
+ * 14. A few autometrics are provided for monitoring the latency aspects of the cassandra cluster
+ */
+
+@InterfaceStability.Evolving
+public abstract class AbstractUpsertOutputOperator extends BaseOperator implements
+ Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener
+{
+
+ protected ConnectionStateManager connectionStateManager;
+
+ private WindowDataManager windowDataManager;
+
+ private long currentWindowId;
+
+ private transient boolean isInSafeMode;
+
+ private transient long reconcilingWindowId;
+
+ private transient boolean isInReconcilingMode;
+
+ protected transient Session session;
+
+ protected transient Cluster cluster;
+
+ transient Map<String, TypeCodec> complexTypeCodecs;
+
+ transient Map<String, Class> userDefinedTypesClass;
+
+ transient Map<String, DataType> columnDefinitions;
+
+ transient Map<String, String> colNamesMap;
+
+ transient Set<String> pkColumnNames;
+
+ transient Set<String> counterColumns;
+
+ transient Set<String> collectionColumns;
+
+ transient Set<String> listColumns;
+
+ transient Set<String> mapColumns;
+
+ transient Set<String> setColumns;
+
+ transient Set<String> userDefinedTypeColumns;
+
+ transient Set<String> regularColumns;
+
+ protected Map<String, Object> getters;
+
+ protected Map<String, TypeCodec> codecsForCassandraColumnNames;
+
+ CassandraPreparedStatementGenerator cassandraPreparedStatementGenerationUtil;
+
+ transient Map<Long, PreparedStatement> preparedStatementTypes;
+ transient Class<?> tuplePayloadClass;
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AbstractUpsertOutputOperator.class);
+
+ private static final String CASSANDRA_CONNECTION_PROPS_FILENAME = "CassandraOutputOperator.properties";
+
+ // Metrics
+
+ @AutoMetric
+ transient long ignoredRequestsDuetoIfExistsCheck = 0;
+
+ @AutoMetric
+ transient long successfullWrites = 0;
+
+ @AutoMetric
+ long totalWriteRequestsAttempted = 0;
+
+ @AutoMetric
+ transient int numberOfHostsWrittenTo = 0;
+
+ @AutoMetric
+ transient double fifteenMinuteWriteRateLatency = 0.0;
+
+ @AutoMetric
+ transient double fiveMinuteWriteRateLatency = 0.0;
+
+ @AutoMetric
+ transient double oneMinuteWriteRateLatency = 0.0;
+
+ @AutoMetric
+ transient double meanWriteRateLatency = 0.0;
+
+ @AutoMetric
+ transient long totalIgnoresInThisWindow = 0;
+
+ @AutoMetric
+ long totalIgnoresSinceStart = 0;
+
+ @AutoMetric
+ transient long totalWriteTimeoutsInThisWindow = 0;
+
+ @AutoMetric
+ long totalWriteTimeoutsSinceStart = 0;
+
+ @AutoMetric
+ transient long totalWriteRetriesInThisWindow = 0;
+
+ @AutoMetric
+ long totalWriteRetriesSinceStart = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyOne = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyTwo = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyThree = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyAll = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyLocalOne = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyQuorum = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyLocalQuorum = 0;
+
+ @AutoMetric
+ transient long writeWithConsistencyLocalSerial = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyEachQuorum = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencySerial = 0;
+
+ @AutoMetric
+ transient long writesWithConsistencyAny = 0;
+
+ transient Set<Host> uniqueHostsWrittenToInCurrentWindow;
+
+
+ @InputPortFieldAnnotation
+ public final transient DefaultInputPort<UpsertExecutionContext> input = new DefaultInputPort<UpsertExecutionContext>()
+ {
+ @Override
+ public void process(final UpsertExecutionContext tuple)
+ {
+ if (!isEligbleForPassivation(tuple)) {
+ return;
+ }
+ BoundStatement stmnt = setDefaultsAndPrepareBoundStatement(tuple);
+ ResultSet result = session.execute(stmnt);
+ updatePerRowMetric(result);
+ }
+ }; // end of input port implementation
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ windowDataManager = getWindowDataManager();
+ if ( windowDataManager == null) {
+ windowDataManager = new FSWindowDataManager();
+ }
+ windowDataManager.setup(context);
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.teardown();
+ if (null != windowDataManager) {
+ windowDataManager.teardown();
+ }
+ }
+
+
+ /**
+ * Primarily resets the per window counter metrics.
+ * @param windowId The windowid as provided by the apex framework
+ */
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ totalIgnoresInThisWindow = 0;
+ totalWriteTimeoutsInThisWindow = 0;
+ totalWriteRetriesInThisWindow = 0;
+ uniqueHostsWrittenToInCurrentWindow.clear();
+ successfullWrites = 0;
+ ignoredRequestsDuetoIfExistsCheck = 0;
+ writesWithConsistencyOne = 0;
+ writesWithConsistencyTwo = 0;
+ writesWithConsistencyThree = 0;
+ writesWithConsistencyAll = 0;
+ writesWithConsistencyLocalOne = 0;
+ writesWithConsistencyQuorum = 0;
+ writesWithConsistencyLocalQuorum = 0;
+ writeWithConsistencyLocalSerial = 0;
+ writesWithConsistencyEachQuorum = 0;
+ writesWithConsistencySerial = 0;
+ writesWithConsistencyAny = 0;
+ currentWindowId = windowId;
+ if ( currentWindowId != Stateless.WINDOW_ID) {
+ if (currentWindowId > reconcilingWindowId) {
+ isInSafeMode = false;
+ isInReconcilingMode = false;
+ }
+ if (currentWindowId == reconcilingWindowId) {
+ isInReconcilingMode = true;
+ isInSafeMode = false;
+ }
+ if (currentWindowId < reconcilingWindowId) {
+ isInReconcilingMode = false;
+ isInSafeMode = true;
+ }
+ }
+ }
+
+ /**
+ * Builds the metrics that can be sent to Application master.
+ * Note that some of the metrics are computed in the cassandra driver itself and hence just
+ * extracted from the driver state itself.
+ */
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ Timer timerForThisWindow = session.getCluster().getMetrics().getRequestsTimer();
+ totalWriteRequestsAttempted += timerForThisWindow.getCount();
+ numberOfHostsWrittenTo = uniqueHostsWrittenToInCurrentWindow.size();
+ fifteenMinuteWriteRateLatency = timerForThisWindow.getFifteenMinuteRate();
+ fiveMinuteWriteRateLatency = timerForThisWindow.getFiveMinuteRate();
+ oneMinuteWriteRateLatency = timerForThisWindow.getOneMinuteRate();
+ meanWriteRateLatency = timerForThisWindow.getMeanRate();
+ Metrics.Errors errors = session.getCluster().getMetrics().getErrorMetrics();
+ totalIgnoresInThisWindow = errors.getIgnores().getCount() - totalIgnoresSinceStart;
+ totalIgnoresSinceStart = errors.getIgnores().getCount();
+ totalWriteTimeoutsInThisWindow = errors.getWriteTimeouts().getCount() - totalWriteTimeoutsSinceStart;
+ totalWriteTimeoutsSinceStart = errors.getWriteTimeouts().getCount();
+ totalWriteRetriesInThisWindow = errors.getRetriesOnWriteTimeout().getCount() - totalWriteRetriesSinceStart;
+ totalWriteRetriesSinceStart = errors.getRetriesOnWriteTimeout().getCount();
+ try {
+ // we do not need any particular state and hence reusing the window id itself
+ windowDataManager.save(currentWindowId,currentWindowId);
+ } catch (IOException e) {
+ LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage());
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ /**
+ * Initializes cassandra cluster connections as specified by the Connection State manager.
+ * Aspects that are initialized here include Identifying primary key column names, non-primary key columns,
+ * collection type columns, counter columns
+ * It also queries the Keyspace and Table metadata for extracting the above information.
+ * It finally prepares all possible prepared statements that can be executed in the lifetime of the operator
+ * for various permutations like APPEND/REMOVE COLLECTIONS , LIST APPEND/PREPEND , set nulls, set TTLs etc
+ * @param context The apex framework context
+ */
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ ConnectionStateManager.ConnectionBuilder connectionBuilder = withConnectionBuilder();
+ if (connectionBuilder == null) {
+ connectionBuilder = buildConnectionBuilderFromPropertiesFile();
+ }
+ checkNotNull(connectionBuilder, " Connection Builder cannot be null.");
+ connectionStateManager = connectionBuilder.initialize();
+ cluster = connectionStateManager.getCluster();
+ session = connectionStateManager.getSession();
+ checkNotNull(session, "Cassandra session cannot be null");
+ tuplePayloadClass = getPayloadPojoClass();
+ columnDefinitions = new HashMap<>();
+ counterColumns = new HashSet<>();
+ collectionColumns = new HashSet<>();
+ pkColumnNames = new HashSet<>();
+ listColumns = new HashSet<>();
+ mapColumns = new HashSet<>();
+ setColumns = new HashSet<>();
+ codecsForCassandraColumnNames = new HashMap<>();
+ userDefinedTypeColumns = new HashSet<>();
+ regularColumns = new HashSet<>();
+ colNamesMap = new HashMap<>();
+ getters = new HashMap<>();
+ userDefinedTypesClass = new HashMap<>();
+ uniqueHostsWrittenToInCurrentWindow = new HashSet<>();
+ registerCodecs();
+ KeyspaceMetadata keyspaceMetadata = cluster.getMetadata()
+ .getKeyspace(connectionStateManager.getKeyspaceName());
+ TableMetadata tableMetadata = keyspaceMetadata
+ .getTable(connectionStateManager.getTableName());
+ registerPrimaryKeyColumnDefinitions(tableMetadata);
+ registerNonPKColumnDefinitions(tableMetadata);
+ preparedStatementTypes = new HashMap<>();
+ generatePreparedStatements();
+ registerGettersForPayload();
+ isInSafeMode = false;
+ isInReconcilingMode = false;
+ reconcilingWindowId = Stateless.WINDOW_ID;
+ if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+ context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+ windowDataManager.getLargestCompletedWindow()) {
+ isInSafeMode = true;
+ reconcilingWindowId = windowDataManager.getLargestCompletedWindow() + 1;
+ isInReconcilingMode = false;
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ connectionStateManager.close();
+ }
+
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.committed(windowId);
+ } catch (IOException e) {
+ LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage());
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ // nothing to be done here. Prevent concrete implementations to be forced to implement this
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ // Nothing to be done here. Concrete operators can override if needed.
+ }
+
+ private ConnectionStateManager.ConnectionBuilder buildConnectionBuilderFromPropertiesFile()
+ {
+ ConnectionStateManager.ConnectionBuilder propFileBasedConnectionBuilder = null;
+ Properties config = new Properties();
+ try (InputStream cassandraPropsFile = getClass().getClassLoader().getResourceAsStream(
+ CASSANDRA_CONNECTION_PROPS_FILENAME)) {
+ config.load(cassandraPropsFile);
+ propFileBasedConnectionBuilder = new ConnectionStateManager.ConnectionBuilder();
+ return propFileBasedConnectionBuilder
+ .withClusterNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.CLUSTER_NAME_IN_PROPS_FILE))
+ .withDCNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.DC_NAME_IN_PROPS_FILE))
+ .withKeySpaceNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.KEYSPACE_NAME_IN_PROPS_FILE))
+ .withTableNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.TABLE_NAME_IN_PROPS_FILE))
+ .withSeedNodes(config.getProperty(ConnectionStateManager.ConnectionBuilder.SEEDNODES_IN_PROPS_FILE));
+ } catch (Exception ex) {
+ LOG.error("Error while trying to load cassandra config from properties file " +
+ CASSANDRA_CONNECTION_PROPS_FILENAME + " because " + ex.getMessage(), ex);
+ return null;
+ }
+ }
+
+ protected boolean isEligbleForPassivation(final UpsertExecutionContext tuple)
+ {
+ if (isInSafeMode) {
+ return false;
+ }
+ if (isInReconcilingMode) {
+ return reconcileRecord(tuple,currentWindowId);
+ }
+ return true;
+ }
+
+ private BoundStatement setDefaultsAndPrepareBoundStatement(UpsertExecutionContext tuple)
+ {
+ UpsertExecutionContext.NullHandlingMutationStyle nullHandlingMutationStyle = tuple.getNullHandlingMutationStyle();
+ if (UpsertExecutionContext.NullHandlingMutationStyle.UNDEFINED == nullHandlingMutationStyle) {
+ nullHandlingMutationStyle = UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS;
+ }
+ boolean setNulls = true;
+ if (nullHandlingMutationStyle != UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS) {
+ setNulls = false;
+ }
+ UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle = tuple.getCollectionMutationStyle();
+ if ((collectionMutationStyle == null) ||
+ (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.UNDEFINED) ) {
+ tuple.setCollectionMutationStyle(UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+ }
+ UpsertExecutionContext.ListPlacementStyle listPlacementStyle = tuple.getListPlacementStyle();
+ if ( (listPlacementStyle == null) || (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.UNDEFINED) ) {
+ tuple.setListPlacementStyle(UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST);
+ }
+ PreparedStatement preparedStatement = resolvePreparedStatementForCurrentExecutionContext(tuple);
+ BoundStatement stmnt = processPayloadForExecution(preparedStatement, tuple, setNulls);
+ if ((tuple.isTtlOverridden()) || (connectionStateManager.isTTLSet())) {
+ int ttlToUse = connectionStateManager.getDefaultTtlInSecs();
+ if (tuple.isTtlOverridden()) {
+ ttlToUse = tuple.getOverridingTTL();
+ }
+ stmnt.setInt(CassandraPreparedStatementGenerator.TTL_PARAM_NAME, ttlToUse);
+ }
+ if (tuple.isOverridingConsistencyLevelSet()) {
+ ConsistencyLevel currentConsistencyLevel = tuple.getOverridingConsistencyLevel();
+ if (currentConsistencyLevel.isSerial()) {
+ stmnt.setSerialConsistencyLevel(tuple.getOverridingConsistencyLevel());
+ } else {
+ stmnt.setConsistencyLevel(tuple.getOverridingConsistencyLevel());
+ }
+ }
+ LOG.debug("Executing statement " + preparedStatement.getQueryString());
+ return stmnt;
+ }
+
+ private void updatePerRowMetric(ResultSet result)
+ {
+ uniqueHostsWrittenToInCurrentWindow.add(result.getExecutionInfo().getQueriedHost());
+ updateConsistencyLevelMetrics(result.getExecutionInfo().getAchievedConsistencyLevel());
+ successfullWrites += 1;
+ if (!result.wasApplied()) {
+ ignoredRequestsDuetoIfExistsCheck += 1;
+ }
+ }
+
+ private void updateConsistencyLevelMetrics(ConsistencyLevel resultConsistencyLevel)
+ {
+ if (resultConsistencyLevel == null) {
+ return;
+ }
+ switch (resultConsistencyLevel) {
+ case ALL:
+ writesWithConsistencyAll += 1;
+ break;
+ case ANY:
+ writesWithConsistencyAny += 1;
+ break;
+ case EACH_QUORUM:
+ writesWithConsistencyEachQuorum += 1;
+ break;
+ case LOCAL_ONE:
+ writesWithConsistencyLocalOne += 1;
+ break;
+ case LOCAL_QUORUM:
+ writesWithConsistencyLocalQuorum += 1;
+ break;
+ case LOCAL_SERIAL:
+ writeWithConsistencyLocalSerial += 1;
+ break;
+ case ONE:
+ writesWithConsistencyOne += 1;
+ break;
+ case QUORUM:
+ writesWithConsistencyQuorum += 1;
+ break;
+ case SERIAL:
+ writesWithConsistencySerial += 1;
+ break;
+ case THREE:
+ writesWithConsistencyThree += 1;
+ break;
+ case TWO:
+ writesWithConsistencyTwo += 1;
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Shortlists the prepared statement from a cache that is populated initially.
+ * @param tuple The execution context that is used to mutate the current cassandra row
+ * @return The prepared statement that is applicable for the current execution context
+ */
+ private PreparedStatement resolvePreparedStatementForCurrentExecutionContext(UpsertExecutionContext tuple)
+ {
+ EnumSet<OperationContext> operationContextValue = EnumSet.noneOf(OperationContext.class);
+
+ UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle = tuple.getCollectionMutationStyle();
+ if (collectionMutationStyle != null) {
+ if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION) {
+ operationContextValue.add(OperationContext.COLLECTIONS_APPEND);
+ }
+ if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION) {
+ operationContextValue.add(OperationContext.COLLECTIONS_REMOVE);
+ }
+ }
+ UpsertExecutionContext.ListPlacementStyle listPlacementStyle = tuple.getListPlacementStyle();
+ boolean isListContextSet = false;
+ if ((listPlacementStyle != null) && (collectionMutationStyle ==
+ UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION)) {
+ if (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST) {
+ operationContextValue.add(OperationContext.LIST_APPEND);
+ isListContextSet = true;
+ }
+ if (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST) {
+ operationContextValue.add(OperationContext.LIST_PREPEND);
+ isListContextSet = true;
+ }
+ }
+ if (!isListContextSet) {
+ // use cases when remove is specified but we do not want to build separate prepared statments
+ operationContextValue.add(OperationContext.LIST_APPEND);
+ }
+ if ((connectionStateManager.isTTLSet()) || (tuple.isTtlOverridden())) {
+ operationContextValue.add(OperationContext.TTL_SET);
+ } else {
+ operationContextValue.add(OperationContext.TTL_NOT_SET);
+ }
+ if (tuple.isUpdateOnlyIfPrimaryKeyExists()) {
+ operationContextValue.add(OperationContext.IF_EXISTS_CHECK_PRESENT);
+ } else {
+ operationContextValue.add(OperationContext.IF_EXISTS_CHECK_ABSENT);
+ }
+ return preparedStatementTypes.get(CassandraPreparedStatementGenerator
+ .getSlotIndexForMutationContextPreparedStatement(operationContextValue));
+ }
+
+ /**
+ * Generates a Boundstatement that can be executed for the given incoming tuple. This boundstatement is then
+ * executed as a command
+ * @param ps The prepared statement that was shortlisted to execute the given tuple
+ * @param tuple The tuple that represents the current execution context
+ * @param setNulls This represents the value whether the columns in the prepared statement need to be ignored or
+ * considered
+ * @return The boundstatement appropriately built
+ */
+ @SuppressWarnings(value = "unchecked")
+ private BoundStatement processPayloadForExecution(final PreparedStatement ps, final UpsertExecutionContext tuple,
+ final boolean setNulls)
+ {
+ BoundStatement boundStatement = ps.bind();
+ Object pojoPayload = tuple.getPayload();
+ for (String cassandraColName : getters.keySet()) {
+ DataType dataType = columnDefinitions.get(cassandraColName);
+ CassandraPojoUtils.populateBoundStatementWithValue(boundStatement,getters,dataType,cassandraColName,
+ pojoPayload,setNulls,codecsForCassandraColumnNames);
+ }
+ return boundStatement;
+ }
+
+
+ /**
+ * Builds th map that manages the getters for a given cassandra column
+ * Aspects like case-insensitiveness , over-riding of column names to custom mappings
+ */
+ private void registerGettersForPayload()
+ {
+ Field[] classFields = tuplePayloadClass.getDeclaredFields();
+ Set<String> allColNames = new HashSet<>();
+ Map<String, DataType> dataTypeMap = new HashMap<>();
+ Map<String,String> overridingColnamesMap = getPojoFieldNameToCassandraColumnNameOverride();
+ if ( overridingColnamesMap == null) {
+ overridingColnamesMap = new HashMap<>();
+ }
+ allColNames.addAll(pkColumnNames);
+ allColNames.addAll(regularColumns);
+ Set<String> normalizedColNames = new HashSet<>();
+ Iterator<String> simpleColIterator = allColNames.iterator();
+ while (simpleColIterator.hasNext()) {
+ String aCol = simpleColIterator.next();
+ normalizedColNames.add(aCol.toLowerCase());
+ dataTypeMap.put(aCol.toLowerCase(), columnDefinitions.get(aCol));
+ colNamesMap.put(aCol.toLowerCase(), aCol);
+ codecsForCassandraColumnNames.put(aCol, complexTypeCodecs.get(aCol.toLowerCase()));
+ }
+ for (Field aField : classFields) {
+ String aFieldName = aField.getName();
+ if ( (normalizedColNames.contains(aFieldName.toLowerCase())) ||
+ (overridingColnamesMap.containsKey(aFieldName)) ) {
+
+ String getterExpr = aFieldName;
+ DataType returnDataTypeOfGetter = dataTypeMap.get(aFieldName.toLowerCase());
+ if (returnDataTypeOfGetter == null) {
+ returnDataTypeOfGetter = dataTypeMap.get(overridingColnamesMap.get(aFieldName));
+ }
+ Object getter = CassandraPojoUtils.resolveGetterForField(tuplePayloadClass,getterExpr,
+ returnDataTypeOfGetter,userDefinedTypesClass);
+ String resolvedColumnName = colNamesMap.get(aFieldName.toLowerCase());
+ if (overridingColnamesMap.containsKey(aFieldName)) {
+ resolvedColumnName = overridingColnamesMap.get(aFieldName);
+ }
+ getters.put(resolvedColumnName, getter);
+ }
+ }
+ }
+
+ private void registerCodecs()
+ {
+ complexTypeCodecs = getCodecsForUserDefinedTypes();
+ if (complexTypeCodecs != null) {
+ CodecRegistry registry = cluster.getConfiguration().getCodecRegistry();
+ if (cluster.getConfiguration().getProtocolOptions().getProtocolVersion().toInt() < 4) {
+ LOG.error("Custom codecs are not supported for protocol version < 4");
+ throw new RuntimeException("Custom codecs are not supported for protocol version < 4");
+ }
+ for (String typeCodecStr : complexTypeCodecs.keySet()) {
+ TypeCodec codec = complexTypeCodecs.get(typeCodecStr);
+ registry.register(codec);
+ userDefinedTypesClass.put(typeCodecStr, codec.getJavaType().getRawType());
+ }
+ } else {
+ complexTypeCodecs = new HashMap<>();
+ }
+ }
+
+ private void registerNonPKColumnDefinitions(final TableMetadata tableMetadata)
+ {
+ List<ColumnMetadata> colInfoForTable = tableMetadata.getColumns();
+ for (ColumnMetadata aColumnDefinition : colInfoForTable) {
+ if (aColumnDefinition.getType().isCollection()) {
+ collectionColumns.add(aColumnDefinition.getName());
+ }
+ if (!pkColumnNames.contains(aColumnDefinition.getName())) {
+ columnDefinitions.put(aColumnDefinition.getName(), aColumnDefinition.getType());
+ regularColumns.add(aColumnDefinition.getName());
+ }
+ parseForSpecialDataType(aColumnDefinition);
+ }
+ }
+
+ private void parseForSpecialDataType(final ColumnMetadata aColumnDefinition)
+ {
+ switch (aColumnDefinition.getType().getName()) {
+ case COUNTER:
+ counterColumns.add(aColumnDefinition.getName());
+ break;
+ case MAP:
+ mapColumns.add(aColumnDefinition.getName());
+ break;
+ case SET:
+ setColumns.add(aColumnDefinition.getName());
+ break;
+ case LIST:
+ listColumns.add(aColumnDefinition.getName());
+ break;
+ case UDT:
+ userDefinedTypeColumns.add(aColumnDefinition.getName());
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void registerPrimaryKeyColumnDefinitions(final TableMetadata tableMetadata)
+ {
+ List<ColumnMetadata> primaryKeyColumns = tableMetadata.getPrimaryKey();
+ for (ColumnMetadata primaryColumn : primaryKeyColumns) {
+ columnDefinitions.put(primaryColumn.getName(), primaryColumn.getType());
+ pkColumnNames.add(primaryColumn.getName());
+ parseForSpecialDataType(primaryColumn);
+ }
+ }
+
+ private void generatePreparedStatements()
+ {
+ cassandraPreparedStatementGenerationUtil = new CassandraPreparedStatementGenerator(
+ pkColumnNames, counterColumns, listColumns,
+ mapColumns, setColumns, columnDefinitions);
+ cassandraPreparedStatementGenerationUtil.generatePreparedStatements(session, preparedStatementTypes,
+ connectionStateManager.getKeyspaceName(), connectionStateManager.getTableName());
+ }
+
+ public Map<String, DataType> getColumnDefinitions()
+ {
+ return columnDefinitions;
+ }
+
+ public void setColumnDefinitions(final Map<String, DataType> columnDefinitions)
+ {
+ this.columnDefinitions = columnDefinitions;
+ }
+
+ public Map<String, Class> getUserDefinedTypesClass()
+ {
+ return userDefinedTypesClass;
+ }
+
+ public void setUserDefinedTypesClass(final Map<String, Class> userDefinedTypesClass)
+ {
+ this.userDefinedTypesClass = userDefinedTypesClass;
+ }
+
+ public Set<String> getPkColumnNames()
+ {
+ return pkColumnNames;
+ }
+
+ public void setPkColumnNames(final Set<String> pkColumnNames)
+ {
+ this.pkColumnNames = pkColumnNames;
+ }
+
+ public Set<String> getCounterColumns()
+ {
+ return counterColumns;
+ }
+
+ public void setCounterColumns(final Set<String> counterColumns)
+ {
+ this.counterColumns = counterColumns;
+ }
+
+ public Set<String> getCollectionColumns()
+ {
+ return collectionColumns;
+ }
+
+ public void setCollectionColumns(final Set<String> collectionColumns)
+ {
+ this.collectionColumns = collectionColumns;
+ }
+
+ public Set<String> getListColumns()
+ {
+ return listColumns;
+ }
+
+ public void setListColumns(final Set<String> listColumns)
+ {
+ this.listColumns = listColumns;
+ }
+
+ public Set<String> getMapColumns()
+ {
+ return mapColumns;
+ }
+
+ public void setMapColumns(Set<String> mapColumns)
+ {
+ this.mapColumns = mapColumns;
+ }
+
+ public Set<String> getSetColumns()
+ {
+ return setColumns;
+ }
+
+ public void setSetColumns(Set<String> setColumns)
+ {
+ this.setColumns = setColumns;
+ }
+
+ public Set<String> getUserDefinedTypeColumns()
+ {
+ return userDefinedTypeColumns;
+ }
+
+ public void setUserDefinedTypeColumns(Set<String> userDefinedTypeColumns)
+ {
+ this.userDefinedTypeColumns = userDefinedTypeColumns;
+ }
+
+ public Set<String> getRegularColumns()
+ {
+ return regularColumns;
+ }
+
+ public void setRegularColumns(Set<String> regularColumns)
+ {
+ this.regularColumns = regularColumns;
+ }
+
+ public Map<Long, PreparedStatement> getPreparedStatementTypes()
+ {
+ return preparedStatementTypes;
+ }
+
+ public void setPreparedStatementTypes(Map<Long, PreparedStatement> preparedStatementTypes)
+ {
+ this.preparedStatementTypes = preparedStatementTypes;
+ }
+
+ public Map<String, Object> getGetters()
+ {
+ return getters;
+ }
+
+ public void setGetters(Map<String, Object> getters)
+ {
+ this.getters = getters;
+ }
+
+ public ConnectionStateManager getConnectionStateManager()
+ {
+ return connectionStateManager;
+ }
+
+ public void setConnectionStateManager(ConnectionStateManager connectionStateManager)
+ {
+ this.connectionStateManager = connectionStateManager;
+ }
+
+ public WindowDataManager getWindowDataManager()
+ {
+ return windowDataManager;
+ }
+
+ public void setWindowDataManager(WindowDataManager windowDataManager)
+ {
+ this.windowDataManager = windowDataManager;
+ }
+
+ /***
+ * Implementing concrete Operator instances define the Connection Builder properties by implementing this method
+ * Please refer to {@link com.datatorrent.contrib.cassandra.ConnectionStateManager.ConnectionBuilder} for
+ * an example implementation of the ConnectionStateManager instantiation.
+ * Note that if this method is returning null, the connection properties are
+ * fetched from a properties file loaded from the classpath.
+ * @return The connection state manager that is to be used for this Operator.
+ */
+ public ConnectionStateManager.ConnectionBuilder withConnectionBuilder()
+ {
+ return null;
+ }
+
+ /**
+ * The implementing concrete operator needs to implement this map. The key is the name of the cassandra column
+ * that this codec is used to map to. The TypeCode is used to represent the codec for that column in cassandra
+ * Please refer to test example UserUpsertOperator.java for implementation.
+ * Concrete implementations can return a null if there are no user defined types
+ * @return A map giving column name to the codec to be used
+ */
+ public abstract Map<String, TypeCodec> getCodecsForUserDefinedTypes();
+
+ /**
+ * Defines the Pojo class that is used to represent the row in the table that is set in the ConnectionStateManager
+ * instance. The Class that is returned here should match the template type of the execution context
+ * {@link UpsertExecutionContext}
+ * @return The class that is used as the payload of the Execution context.
+ */
+ public abstract Class getPayloadPojoClass();
+
+ /**
+ * Concrete implementations can override this method to provide a custom map of a POJO file name to the cassandra
+ * column name. This is useful when POJOs that are acting as payloads
+ * 1. Cannot comply with code conventions of POJO as opposed to cassandra column names Ex: Cassandra column names
+ * might have underscores and POJO fields might not be in that format.
+ * It may be noted case sensitivity is ignored when trying to match Cassandra Column names
+ * {@code
+ * @Override
+ protected Map<String, String> getPojoFieldNameToCassandraColumnNameOverride()
+ {
+ Map<String,String> overridingColMap = new HashMap<>();
+ overridingColMap.put("topScores","top_scores"); // topScores is POJO field name and top_scores is Cassandra col
+ return overridingColMap;
+ }
+ *
+ * }
+ * @return A map of the POJO field name as key and value as the Cassandra Column name
+ */
+ protected Map<String,String> getPojoFieldNameToCassandraColumnNameOverride()
+ {
+ return new HashMap<>();
+ }
+
+ /**
+ *
+ * Since Cassandra is not a strictly transactional system and if the Apex operator crashes when a window is in
+ * transit, we might be replaying the same data to be written to cassandra. In the event of such situations, we
+ * would like the control to the concrete operator instance to resolve if they want to let the write happen
+ * or simply skip it. Return true if the write needs to go through or return false to prevent the write
+ * from happening.
+ * Note that this check only happens for one window of data when an operator is resuming from a previous start
+ * In the case of a restart from a previously checkpointed window, the operator simply runs in a "safe mode"
+ * until it reaches the reconciliation window. This is the only window in which this method is called. Hence it
+ * might be okay if this method is "heavy". For example the implementor can choose to read from cassandra for the
+ * incoming record key entry and decide to let the write through or ignore it completely. This is on a per tuple
+ * basis just for the reconciliation window only. Post reconciliation window, the data simply flows through
+ * without this check.
+ * @param T
+ * @param windowId
+ * @return Whether the current POJO that is being passed in should be allowed to write into the cassandra row just for
+ * the reconciling window phase
+ ***/
+
+ abstract boolean reconcileRecord(Object T, long windowId);
+
+ enum OperationContext
+ {
+ UNDEFINED,
+ COLLECTIONS_APPEND,
+ COLLECTIONS_REMOVE,
+ LIST_APPEND,
+ LIST_PREPEND,
+ TTL_SET,
+ TTL_NOT_SET,
+ IF_EXISTS_CHECK_PRESENT,
+ IF_EXISTS_CHECK_ABSENT,
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 9c56178..f43777f 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -345,7 +345,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O
((Setter<Object, List<?>>)setters.get(i)).set(obj, list);
break;
case TIMESTAMP:
- final Date date = row.getDate(columnName);
+ final Date date = new Date(row.getDate(columnName).getMillisSinceEpoch());
((Setter<Object, Date>)setters.get(i)).set(obj, date);
break;
default:
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 2d1fea3..a191bb0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -295,7 +295,7 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
break;
case TIMESTAMP:
final Date date = ((Getter<Object, Date>)getters.get(i)).get(tuple);
- boundStmnt.setDate(i, date);
+ boundStmnt.setDate(i, LocalDate.fromMillisSinceEpoch(date.getTime()));
break;
default:
throw new RuntimeException("unsupported data type " + type.getName());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java
new file mode 100644
index 0000000..b1f5f4a
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.LocalDate;
+import com.datastax.driver.core.TypeCodec;
+import com.datatorrent.lib.util.PojoUtils;
+
+/***
+ * Used to manage simple data type based getters for given cassandra columns
+ */
+public class CassandraPojoUtils
+{
+ /***
+ * Resolves a getter that can be associated with the given field name in the Pojo matching to the given
+ * data type of cassandra
+ * @param tuplePayloadClass The tuple class that is used to build the getter from
+ * @param getterExpr The name of the field representing the getter that needs to be generated
+ * @param returnDataTypeOfGetter The Data type of the cassandra column
+ * @param userDefinedTypesClass A map that can provide for a UDT class given a column name
+ * @return The getter object that can be used to extract the value at runtime
+ */
+ public static Object resolveGetterForField(Class tuplePayloadClass, String getterExpr,
+ DataType returnDataTypeOfGetter, Map<String,Class> userDefinedTypesClass)
+ {
+ Object getter = null;
+ switch (returnDataTypeOfGetter.getName()) {
+ case INT:
+ getter = PojoUtils.createGetterInt(tuplePayloadClass, getterExpr);
+ break;
+ case BIGINT:
+ case COUNTER:
+ getter = PojoUtils.createGetterLong(tuplePayloadClass, getterExpr);
+ break;
+ case ASCII:
+ case TEXT:
+ case VARCHAR:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, String.class);
+ break;
+ case BOOLEAN:
+ getter = PojoUtils.createGetterBoolean(tuplePayloadClass, getterExpr);
+ break;
+ case FLOAT:
+ getter = PojoUtils.createGetterFloat(tuplePayloadClass, getterExpr);
+ break;
+ case DOUBLE:
+ getter = PojoUtils.createGetterDouble(tuplePayloadClass, getterExpr);
+ break;
+ case DECIMAL:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, BigDecimal.class);
+ break;
+ case SET:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Set.class);
+ break;
+ case MAP:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Map.class);
+ break;
+ case LIST:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, List.class);
+ break;
+ case TIMESTAMP:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Date.class);
+ break;
+ case UUID:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, UUID.class);
+ break;
+ case UDT:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, userDefinedTypesClass.get(getterExpr));
+ break;
+ default:
+ getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Object.class);
+ break;
+ }
+ return getter;
+ }
+
+ /***
+ * Populates a given bound statement column with a value give a POJO and the map representing the getters
+ * @param boundStatement The statement that needs to be populated with the value
+ * @param getters A map mapping the applicable getter for a given column name as key
+ * @param dataType The data type of the cassandra column name
+ * @param cassandraColName The name of the cassandra column
+ * @param pojoPayload The POJO from which the value needs to be extracted
+ * @param setNulls Whether nulls can be set explicitly
+ * @param codecsForCassandraColumnNames A map giving column name to codec with key as column name
+ */
+ @SuppressWarnings(value = "unchecked")
+ public static void populateBoundStatementWithValue(BoundStatement boundStatement, Map<String,Object> getters,
+ DataType dataType, String cassandraColName, Object pojoPayload, boolean setNulls,
+ Map<String,TypeCodec> codecsForCassandraColumnNames)
+ {
+ switch (dataType.getName()) {
+ case BOOLEAN:
+ PojoUtils.GetterBoolean<Object> boolGetter = ((PojoUtils.GetterBoolean<Object>)getters
+ .get(cassandraColName));
+ if (boolGetter != null) {
+ final boolean bool = boolGetter.get(pojoPayload);
+ boundStatement.setBool(cassandraColName, bool);
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case INT:
+ PojoUtils.GetterInt<Object> inGetter = ((PojoUtils.GetterInt<Object>)getters.get(cassandraColName));
+ if (inGetter != null) {
+ final int intValue = inGetter.get(pojoPayload);
+ boundStatement.setInt(cassandraColName, intValue);
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case BIGINT:
+ case COUNTER:
+ PojoUtils.GetterLong<Object> longGetter = ((PojoUtils.GetterLong<Object>)getters.get(cassandraColName));
+ if (longGetter != null) {
+ final long longValue = longGetter.get(pojoPayload);
+ boundStatement.setLong(cassandraColName, longValue);
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case FLOAT:
+ PojoUtils.GetterFloat<Object> floatGetter = ((PojoUtils.GetterFloat<Object>)getters.get(cassandraColName));
+ if (floatGetter != null) {
+ final float floatValue = floatGetter.get(pojoPayload);
+ boundStatement.setFloat(cassandraColName, floatValue);
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case DOUBLE:
+ PojoUtils.GetterDouble<Object> doubleGetter = ((PojoUtils.GetterDouble<Object>)getters
+ .get(cassandraColName));
+ if (doubleGetter != null) {
+ final double doubleValue = doubleGetter.get(pojoPayload);
+ boundStatement.setDouble(cassandraColName, doubleValue);
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case DECIMAL:
+ PojoUtils.Getter<Object, BigDecimal> bigDecimalGetter = ((PojoUtils.Getter<Object, BigDecimal>)getters
+ .get(cassandraColName));
+ if (bigDecimalGetter != null) {
+ final BigDecimal decimal = bigDecimalGetter.get(pojoPayload);
+ if (decimal == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setDecimal(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setDecimal(cassandraColName, decimal);
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case UUID:
+ PojoUtils.Getter<Object, UUID> uuidGetter = ((PojoUtils.Getter<Object, UUID>)getters.get(cassandraColName));
+ if (uuidGetter != null) {
+ final UUID uuid = uuidGetter.get(pojoPayload);
+ if (uuid == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setUUID(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setUUID(cassandraColName, uuid);
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case ASCII:
+ case VARCHAR:
+ case TEXT:
+ PojoUtils.Getter<Object, String> stringGetter = ((PojoUtils.Getter<Object, String>)getters
+ .get(cassandraColName));
+ if (stringGetter != null) {
+ final String ascii = stringGetter.get(pojoPayload);
+ if (ascii == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setString(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setString(cassandraColName, ascii);
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case SET:
+ PojoUtils.Getter<Object, Set<?>> getterForSet = ((PojoUtils.Getter<Object, Set<?>>)getters
+ .get(cassandraColName));
+ if (getterForSet != null) {
+ final Set<?> set = getterForSet.get(pojoPayload);
+ if (set == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setSet(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setSet(cassandraColName, set);
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case MAP:
+ PojoUtils.Getter<Object, Map<?, ?>> mapGetter = ((PojoUtils.Getter<Object, Map<?, ?>>)getters
+ .get(cassandraColName));
+ if (mapGetter != null) {
+ final Map<?, ?> map = mapGetter.get(pojoPayload);
+ if (map == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setMap(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setMap(cassandraColName, map);
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case LIST:
+ PojoUtils.Getter<Object, List<?>> listGetter = ((PojoUtils.Getter<Object, List<?>>)getters
+ .get(cassandraColName));
+ if (listGetter != null) {
+ final List<?> list = listGetter.get(pojoPayload);
+ if (list == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setList(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setList(cassandraColName, list);
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case TIMESTAMP:
+ PojoUtils.Getter<Object, Date> dateGetter = ((PojoUtils.Getter<Object, Date>)getters.get(cassandraColName));
+ if (dateGetter != null) {
+ final Date date = dateGetter.get(pojoPayload);
+ if (date == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setMap(cassandraColName, null);
+ }
+ } else {
+ boundStatement.setDate(cassandraColName, LocalDate.fromMillisSinceEpoch(date.getTime()));
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ case UDT:
+ PojoUtils.Getter<Object, Object> udtGetter = ((PojoUtils.Getter<Object, Object>)getters
+ .get(cassandraColName));
+ if (udtGetter != null) {
+ final Object udtPayload = udtGetter.get(pojoPayload);
+ if (udtPayload == null) {
+ if (!setNulls) {
+ boundStatement.unset(cassandraColName);
+ } else {
+ boundStatement.setUDTValue(cassandraColName, null);
+ }
+ } else {
+ boundStatement.set(cassandraColName, udtPayload, codecsForCassandraColumnNames
+ .get(cassandraColName).getJavaType().getRawType());
+ }
+ } else {
+ boundStatement.unset(cassandraColName);
+ }
+ break;
+ default:
+ throw new RuntimeException("Type not supported for " + dataType.getName());
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java
new file mode 100644
index 0000000..355fccf
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+
+/***
+ * Used to generate CQL strings that can be used to generate prepared statements.
+ */
+public class CassandraPreparedStatementGenerator
+{
+
+ private Set<String> pkColumnNames;
+
+ private Set<String> counterColumns;
+
+ private Set<String> listColumns;
+
+ private Set<String> mapColumns;
+
+ private Set<String> setColumns;
+
+ private Map<String, DataType> columnDefinitions;
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(CassandraPreparedStatementGenerator.class);
+
+ public static final String TTL_PARAM_NAME = "ttl";
+
+ public CassandraPreparedStatementGenerator(Set<String> pkColumnNames, Set<String> counterColumns,
+ Set<String> listColumns, Set<String> mapColumns, Set<String> setColumns,
+ Map<String, DataType> columnDefinitions)
+ {
+ this.pkColumnNames = pkColumnNames;
+ this.counterColumns = counterColumns;
+ this.listColumns = listColumns;
+ this.mapColumns = mapColumns;
+ this.setColumns = setColumns;
+ this.columnDefinitions = columnDefinitions;
+ }
+
+ public void generatePreparedStatements(Session session,Map<Long, PreparedStatement> preparedStatementTypes,
+ String keyspaceName,String tableName)
+ {
+
+ Map<Long, String> stringsWithoutPKAndExistsClauses = generatePreparedStatementsQueryStrings(keyspaceName,tableName);
+ String ifExistsClause = " IF EXISTS";
+ Map<Long, String> finalSetOfQueryStrings = new HashMap<>();
+ for (Long currentIndexPos : stringsWithoutPKAndExistsClauses.keySet()) {
+ StringBuilder aQueryStub = new StringBuilder(stringsWithoutPKAndExistsClauses.get(currentIndexPos));
+ buildWhereClauseForPrimaryKeys(aQueryStub);
+ finalSetOfQueryStrings.put(currentIndexPos +
+ getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.IF_EXISTS_CHECK_ABSENT)),
+ aQueryStub.toString());
+ if (counterColumns.size() == 0) {
+ // IF exists cannot be used for counter column tables
+ finalSetOfQueryStrings.put(currentIndexPos +
+ getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.IF_EXISTS_CHECK_PRESENT)),
+ aQueryStub.toString() + ifExistsClause);
+ }
+ }
+ for (Long currentIndexPos : finalSetOfQueryStrings.keySet()) {
+ String currentQueryStr = finalSetOfQueryStrings.get(currentIndexPos);
+ LOG.info("Registering query support for " + currentQueryStr);
+ PreparedStatement preparedStatementForThisQuery = session.prepare(currentQueryStr);
+ preparedStatementTypes.put(currentIndexPos, preparedStatementForThisQuery);
+ }
+ }
+
+ private void buildWhereClauseForPrimaryKeys(final StringBuilder queryExpression)
+ {
+ queryExpression.append(" WHERE ");
+ int count = 0;
+ for (String pkColName : pkColumnNames) {
+ if (count > 0) {
+ queryExpression.append(" AND ");
+ }
+ count += 1;
+ queryExpression.append(" ").append(pkColName).append(" = :").append(pkColName);
+ }
+ }
+
+
+ private void buildQueryStringForTTLSetCollectionsAppendAndListPrepend(StringBuilder updateQueryRoot,
+ String ttlSetString, Map<Long,String> queryStrings)
+ {
+ // TTL set , Collections Append , List prepend
+ StringBuilder queryExpTTLSetCollAppendListPrepend = new StringBuilder(updateQueryRoot.toString());
+ queryExpTTLSetCollAppendListPrepend.append(ttlSetString);
+ buildNonPKColumnsExpression(queryExpTTLSetCollAppendListPrepend,
+ UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST,
+ UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+ queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.TTL_SET,
+ AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
+ AbstractUpsertOutputOperator.OperationContext.LIST_PREPEND
+ )), queryExpTTLSetCollAppendListPrepend.toString());
+ }
+
+ private void buildQueryStringForTTLSetCollectionsAppendAndListAppend(StringBuilder updateQueryRoot,
+ String ttlSetString,Map<Long,String> queryStrings)
+ {
+ // TTL set , Collections Append , List append
+ StringBuilder queryExpTTLSetCollAppendListAppend = new StringBuilder(updateQueryRoot.toString());
+ queryExpTTLSetCollAppendListAppend.append(ttlSetString);
+ buildNonPKColumnsExpression(queryExpTTLSetCollAppendListAppend,
+ UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST,
+ UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+ queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.TTL_SET,
+ AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
+ AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
+ )), queryExpTTLSetCollAppendListAppend.toString());
+ }
+
+ private void buildQueryStringForTTLSetCollectionsRemove(StringBuilder updateQueryRoot,
+ String ttlSetString,Map<Long,String> queryStrings)
+ {
+ // TTL set , Collections Remove
+ StringBuilder queryExpTTLSetCollRemove = new StringBuilder(updateQueryRoot.toString());
+ queryExpTTLSetCollRemove.append(ttlSetString);
+ buildNonPKColumnsExpression(queryExpTTLSetCollRemove,
+ UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, // Just in case user sets it
+ UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION);
+ queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.TTL_SET,
+ AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_REMOVE,
+ AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
+ )), queryExpTTLSetCollRemove.toString());
+ }
+
+ private void buildQueryStringForTTLNotSetCollectionsAppendWithListPrepend(StringBuilder updateQueryRoot,
+ Map<Long,String> queryStrings)
+ {
+ // TTL Not set , Collections Append , List prepend
+ StringBuilder queryExpTTLNotSetCollAppendListPrepend = new StringBuilder(updateQueryRoot.toString());
+ queryExpTTLNotSetCollAppendListPrepend.append(" SET ");
+ buildNonPKColumnsExpression(queryExpTTLNotSetCollAppendListPrepend,
+ UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST,
+ UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+ queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET,
+ AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
+ AbstractUpsertOutputOperator.OperationContext.LIST_PREPEND
+ )), queryExpTTLNotSetCollAppendListPrepend.toString());
+ }
+
+ private void buildQueryStringForTTLNotSetCollectionsAppendWithListAppend(StringBuilder updateQueryRoot,
+ Map<Long,String> queryStrings)
+ {
+ // TTL Not set , Collections Append , List append
+ StringBuilder queryExpTTLNotSetCollAppendListAppend = new StringBuilder(updateQueryRoot.toString());
+ queryExpTTLNotSetCollAppendListAppend.append(" SET ");
+ buildNonPKColumnsExpression(queryExpTTLNotSetCollAppendListAppend,
+ UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST,
+ UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+ queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET,
+ AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
+ AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
+ )), queryExpTTLNotSetCollAppendListAppend.toString());
+ }
+
+
+ private void buildQueryStringForTTLNotSetCollectionsRemove(StringBuilder updateQueryRoot,
+ Map<Long,String> queryStrings)
+ {
+ // TTL Not set , Collections Remove
+ StringBuilder queryExpTTLNotSetCollRemove = new StringBuilder(updateQueryRoot.toString());
+ queryExpTTLNotSetCollRemove.append(" SET ");
+ buildNonPKColumnsExpression(queryExpTTLNotSetCollRemove,
+ UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, // Just in case user sets it
+ UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION);
+ queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
+ AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET,
+ AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_REMOVE,
+ AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
+ )), queryExpTTLNotSetCollRemove.toString());
+ }
+
+ private Map<Long, String> generatePreparedStatementsQueryStrings(String keyspaceName,String tableName)
+ {
+ Map<Long, String> queryStrings = new HashMap<>();
+ //UPDATE keyspace_name.table_name USING option AND option SET assignment, assignment, ... WHERE row_specification
+ StringBuilder updateQueryRoot = new StringBuilder(" UPDATE " + keyspaceName +
+ "." + tableName + " ");
+ String ttlSetString = " USING ttl :" + TTL_PARAM_NAME + " SET ";
+ buildQueryStringForTTLSetCollectionsAppendAndListPrepend(updateQueryRoot,ttlSetString,queryStrings);
+ buildQueryStringForTTLSetCollectionsAppendAndListAppend(updateQueryRoot,ttlSetString,queryStrings);
+ buildQueryStringForTTLSetCollectionsRemove(updateQueryRoot,ttlSetString,queryStrings);
+ buildQueryStringForTTLNotSetCollectionsAppendWithListPrepend(updateQueryRoot,queryStrings);
+ buildQueryStringForTTLNotSetCollectionsAppendWithListAppend(updateQueryRoot,queryStrings);
+ buildQueryStringForTTLNotSetCollectionsRemove(updateQueryRoot,queryStrings);
+ return queryStrings;
+ }
+
+
+ public static long getSlotIndexForMutationContextPreparedStatement(
+ final EnumSet<AbstractUpsertOutputOperator.OperationContext> context)
+ {
+ Iterator<AbstractUpsertOutputOperator.OperationContext> itrForContexts = context.iterator();
+ long indexValue = 0;
+ while (itrForContexts.hasNext()) {
+ AbstractUpsertOutputOperator.OperationContext aContext = itrForContexts.next();
+ indexValue += Math.pow(10, aContext.ordinal());
+ }
+ return indexValue;
+ }
+
+ private void buildNonPKColumnsExpression(final StringBuilder queryExpression,
+ UpsertExecutionContext.ListPlacementStyle listPlacementStyle,
+ UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle)
+ {
+ int count = 0;
+ for (String colNameEntry : columnDefinitions.keySet()) {
+ if (pkColumnNames.contains(colNameEntry)) {
+ continue;
+ }
+ if (count > 0) {
+ queryExpression.append(",");
+ }
+ count += 1;
+ if (counterColumns.contains(colNameEntry)) {
+ queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry);
+ continue;
+ }
+ DataType dataType = columnDefinitions.get(colNameEntry);
+ if ((!dataType.isCollection()) && (!counterColumns.contains(colNameEntry))) {
+ queryExpression.append(" " + colNameEntry + " = :" + colNameEntry);
+ continue;
+ }
+ if ((dataType.isCollection()) && (!dataType.isFrozen())) {
+ if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION) {
+ queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " - :" + colNameEntry);
+ }
+ if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION) {
+ if ((setColumns.contains(colNameEntry)) || (mapColumns.contains(colNameEntry))) {
+ queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry);
+ }
+ if ((listColumns.contains(colNameEntry)) &&
+ (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST)) {
+ queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry);
+ }
+ if ((listColumns.contains(colNameEntry)) &&
+ (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST)) {
+ queryExpression.append(" " + colNameEntry + " = :" + colNameEntry + " + " + colNameEntry);
+ }
+ }
+ } else {
+ if ((dataType.isCollection()) && (dataType.isFrozen())) {
+ queryExpression.append(" " + colNameEntry + " = :" + colNameEntry);
+ }
+ }
+ }
+ }
+
+}
+