You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/06 16:23:22 UTC
[3/6] git commit: Remove CPRR/CPIF.
Remove CPRR/CPIF.
Patch by brandonwilliams for CASSANDRA-7570
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7fa93a2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7fa93a2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7fa93a2c
Branch: refs/heads/trunk
Commit: 7fa93a2ca7febbff593aafef0265daa8799a9fb3
Parents: f83909e
Author: Brandon Williams <br...@apache.org>
Authored: Wed Aug 6 09:21:14 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Aug 6 09:21:14 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 5 +-
.../hadoop/cql3/CqlPagingInputFormat.java | 85 --
.../hadoop/cql3/CqlPagingRecordReader.java | 800 -------------------
4 files changed, 5 insertions(+), 886 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dcc5bf8..49cb6a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Remove CqlPagingRecordReader/CqlPagingInputFormat (CASSANDRA-7570)
* Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229)
* Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635)
* Update java driver (for hadoop) (CASSANDRA-7618)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 93fe0b1..0491384 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -17,7 +17,10 @@ using the provided 'sstableupgrade' tool.
====
New features
------------
- - If you are using Leveled Compaction, you can now disable doing size-tiered
+ - CqlPaginRecordReader and CqlPagingInputFormat have both been removed.
+ Use CqlInputFormat instead.
+ - If you are using Leveled Compaction, you can now disable doing
+ size-tiered
compaction in L0 by starting Cassandra with -Dcassandra.disable_stcs_in_l0
(see CASSANDRA-6621 for details).
- Shuffle and taketoken have been removed. For clusters that choose to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
deleted file mode 100644
index 96f2f94..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ReporterWrapper;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-/**
- * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
- *
- * At minimum, you need to set the KS and CF in your Hadoop job Configuration.
- * The ConfigHelper class is provided to make this
- * simple:
- * ConfigHelper.setInputColumnFamily
- *
- * You can also configure the number of rows per InputSplit with
- * ConfigHelper.setInputSplitSize. The default split size is 64k rows.
- * the number of CQL rows per page
- *
- * the number of CQL rows per page
- * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
- * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
- * query, so you need set it big enough to minimize the network overhead, and also
- * not too big to avoid out of memory issue.
- *
- * the column names of the select CQL query. The default is all columns
- * CQLConfigHelper.setInputColumns
- *
- * the user defined the where clause
- * CQLConfigHelper.setInputWhereClauses. The default is no user defined where clause
- */
-public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
-{
- public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
- throws IOException
- {
- TaskAttemptContext tac = HadoopCompat.newMapContext(
- jobConf,
- TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
- null,
- null,
- null,
- new ReporterWrapper(reporter),
- null);
-
- CqlPagingRecordReader recordReader = new CqlPagingRecordReader();
- recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
- return recordReader;
- }
-
- @Override
- public org.apache.hadoop.mapreduce.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> createRecordReader(
- org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
- InterruptedException
- {
- return new CqlPagingRecordReader();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
deleted file mode 100644
index 03d9ae9..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.ReversedType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ColumnFamilySplit;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-
-/**
- * Hadoop RecordReader read the values return from the CQL query
- * It use CQL key range query to page through the wide rows.
- * <p/>
- * Return List<IColumn> as keys columns
- * <p/>
- * Map<ByteBuffer, IColumn> as column name to columns mappings
- */
-public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
- implements org.apache.hadoop.mapred.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
-{
- private static final Logger logger = LoggerFactory.getLogger(CqlPagingRecordReader.class);
-
- public static final int DEFAULT_CQL_PAGE_LIMIT = 1000; // TODO: find the number large enough but not OOM
-
- private ColumnFamilySplit split;
- private RowIterator rowIterator;
-
- private Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> currentRow;
- private int totalRowCount; // total number of rows to fetch
- private String keyspace;
- private String cfName;
- private Cassandra.Client client;
- private ConsistencyLevel consistencyLevel;
-
- // partition keys -- key aliases
- private List<BoundColumn> partitionBoundColumns = new ArrayList<BoundColumn>();
-
- // cluster keys -- column aliases
- private List<BoundColumn> clusterColumns = new ArrayList<BoundColumn>();
-
- // map prepared query type to item id
- private Map<Integer, Integer> preparedQueryIds = new HashMap<Integer, Integer>();
-
- // cql query select columns
- private String columns;
-
- // the number of cql rows per page
- private int pageRowSize;
-
- // user defined where clauses
- private String userDefinedWhereClauses;
-
- private IPartitioner partitioner;
-
- private AbstractType<?> keyValidator;
-
- public CqlPagingRecordReader()
- {
- super();
- }
-
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
- {
- this.split = (ColumnFamilySplit) split;
- Configuration conf = HadoopCompat.getConfiguration(context);
- totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
- ? (int) this.split.getLength()
- : ConfigHelper.getInputSplitSize(conf);
- cfName = ConfigHelper.getInputColumnFamily(conf);
- consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
- keyspace = ConfigHelper.getInputKeyspace(conf);
- columns = CqlConfigHelper.getInputcolumns(conf);
- userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
-
- Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
- try
- {
- pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
- }
- catch(NumberFormatException e)
- {
- pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
- }
-
- partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
-
- try
- {
- if (client != null)
- return;
-
- // create connection using thrift
- String[] locations = split.getLocations();
- Exception lastException = null;
- for (String location : locations)
- {
- int port = ConfigHelper.getInputRpcPort(conf);
- try
- {
- client = CqlPagingInputFormat.createAuthenticatedClient(location, port, conf);
- break;
- }
- catch (Exception e)
- {
- lastException = e;
- logger.warn("Failed to create authenticated client to {}:{}", location , port);
- }
- }
- if (client == null && lastException != null)
- throw lastException;
-
- // retrieve partition keys and cluster keys from system.schema_columnfamilies table
- retrieveKeys();
-
- client.set_keyspace(keyspace);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- rowIterator = new RowIterator();
-
- logger.debug("created {}", rowIterator);
- }
-
- public void close()
- {
- if (client != null)
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- client = null;
- }
- }
-
- public Map<String, ByteBuffer> getCurrentKey()
- {
- return currentRow.left;
- }
-
- public Map<String, ByteBuffer> getCurrentValue()
- {
- return currentRow.right;
- }
-
- public float getProgress()
- {
- if (!rowIterator.hasNext())
- return 1.0F;
-
- // the progress is likely to be reported slightly off the actual but close enough
- float progress = ((float) rowIterator.totalRead / totalRowCount);
- return progress > 1.0F ? 1.0F : progress;
- }
-
- public boolean nextKeyValue() throws IOException
- {
- if (!rowIterator.hasNext())
- {
- logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount);
- return false;
- }
-
- try
- {
- currentRow = rowIterator.next();
- }
- catch (Exception e)
- {
- // throw it as IOException, so client can catch it and handle it at client side
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(ioe.getCause());
- throw ioe;
- }
- return true;
- }
-
- // we don't use endpointsnitch since we are trying to support hadoop nodes that are
- // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
- private String[] getLocations()
- {
- Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses();
-
- for (InetAddress address : localAddresses)
- {
- for (String location : split.getLocations())
- {
- InetAddress locationAddress;
- try
- {
- locationAddress = InetAddress.getByName(location);
- }
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
- if (address.equals(locationAddress))
- {
- return new String[] { location };
- }
- }
- }
- return split.getLocations();
- }
-
- // Because the old Hadoop API wants us to write to the key and value
- // and the new asks for them, we need to copy the output of the new API
- // to the old. Thus, expect a small performance hit.
- // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
- // and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> value) throws IOException
- {
- if (nextKeyValue())
- {
- value.clear();
- value.putAll(getCurrentValue());
-
- keys.clear();
- keys.putAll(getCurrentKey());
-
- return true;
- }
- return false;
- }
-
- public long getPos() throws IOException
- {
- return (long) rowIterator.totalRead;
- }
-
- public Map<String, ByteBuffer> createKey()
- {
- return new LinkedHashMap<String, ByteBuffer>();
- }
-
- public Map<String, ByteBuffer> createValue()
- {
- return new LinkedHashMap<String, ByteBuffer>();
- }
-
- /** CQL row iterator */
- private class RowIterator extends AbstractIterator<Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>>>
- {
- protected int totalRead = 0; // total number of cf rows read
- protected Iterator<CqlRow> rows;
- private int pageRows = 0; // the number of cql rows read of this page
- private String previousRowKey = null; // previous CF row key
- private String partitionKeyString; // keys in <key1>, <key2>, <key3> string format
- private String partitionKeyMarkers; // question marks in ? , ? , ? format which matches the number of keys
-
- public RowIterator()
- {
- // initial page
- executeQuery();
- }
-
- protected Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> computeNext()
- {
- if (rows == null)
- return endOfData();
-
- int index = -2;
- //check there are more page to read
- while (!rows.hasNext())
- {
- // no more data
- if (index == -1 || emptyPartitionKeyValues())
- {
- logger.debug("no more data");
- return endOfData();
- }
-
- index = setTailNull(clusterColumns);
- logger.debug("set tail to null, index: {}", index);
- executeQuery();
- pageRows = 0;
-
- if (rows == null || !rows.hasNext() && index < 0)
- {
- logger.debug("no more data");
- return endOfData();
- }
- }
-
- Map<String, ByteBuffer> valueColumns = createValue();
- Map<String, ByteBuffer> keyColumns = createKey();
- int i = 0;
- CqlRow row = rows.next();
- for (Column column : row.columns)
- {
- String columnName = stringValue(ByteBuffer.wrap(column.getName()));
- logger.debug("column: {}", columnName);
-
- if (i < partitionBoundColumns.size() + clusterColumns.size())
- keyColumns.put(stringValue(column.name), column.value);
- else
- valueColumns.put(stringValue(column.name), column.value);
-
- i++;
- }
-
- // increase total CQL row read for this page
- pageRows++;
-
- // increase total CF row read
- if (newRow(keyColumns, previousRowKey))
- totalRead++;
-
- // read full page
- if (pageRows >= pageRowSize || !rows.hasNext())
- {
- Iterator<String> newKeys = keyColumns.keySet().iterator();
- for (BoundColumn column : partitionBoundColumns)
- column.value = keyColumns.get(newKeys.next());
-
- for (BoundColumn column : clusterColumns)
- column.value = keyColumns.get(newKeys.next());
-
- executeQuery();
- pageRows = 0;
- }
-
- return Pair.create(keyColumns, valueColumns);
- }
-
- /** check whether start to read a new CF row by comparing the partition keys */
- private boolean newRow(Map<String, ByteBuffer> keyColumns, String previousRowKey)
- {
- if (keyColumns.isEmpty())
- return false;
-
- String rowKey = "";
- if (keyColumns.size() == 1)
- {
- rowKey = partitionBoundColumns.get(0).validator.getString(keyColumns.get(partitionBoundColumns.get(0).name));
- }
- else
- {
- Iterator<ByteBuffer> iter = keyColumns.values().iterator();
- for (BoundColumn column : partitionBoundColumns)
- rowKey = rowKey + column.validator.getString(ByteBufferUtil.clone(iter.next())) + ":";
- }
-
- logger.debug("previous RowKey: {}, new row key: {}", previousRowKey, rowKey);
- if (previousRowKey == null)
- {
- this.previousRowKey = rowKey;
- return true;
- }
-
- if (rowKey.equals(previousRowKey))
- return false;
-
- this.previousRowKey = rowKey;
- return true;
- }
-
- /** set the last non-null key value to null, and return the previous index */
- private int setTailNull(List<BoundColumn> values)
- {
- if (values.isEmpty())
- return -1;
-
- Iterator<BoundColumn> iterator = values.iterator();
- int previousIndex = -1;
- BoundColumn current;
- while (iterator.hasNext())
- {
- current = iterator.next();
- if (current.value == null)
- {
- int index = previousIndex > 0 ? previousIndex : 0;
- BoundColumn column = values.get(index);
- logger.debug("set key {} value to null", column.name);
- column.value = null;
- return previousIndex - 1;
- }
-
- previousIndex++;
- }
-
- BoundColumn column = values.get(previousIndex);
- logger.debug("set key {} value to null", column.name);
- column.value = null;
- return previousIndex - 1;
- }
-
- /** serialize the prepared query, pair.left is query id, pair.right is query */
- private Pair<Integer, String> composeQuery(String columns)
- {
- Pair<Integer, String> clause = whereClause();
- if (columns == null)
- {
- columns = "*";
- }
- else
- {
- // add keys in the front in order
- String partitionKey = keyString(partitionBoundColumns);
- String clusterKey = keyString(clusterColumns);
-
- columns = withoutKeyColumns(columns);
- columns = (clusterKey == null || "".equals(clusterKey))
- ? partitionKey + (columns != null ? ("," + columns) : "")
- : partitionKey + "," + clusterKey + (columns != null ? ("," + columns) : "");
- }
-
- String whereStr = userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses;
- return Pair.create(clause.left,
- String.format("SELECT %s FROM %s%s%s LIMIT %d ALLOW FILTERING",
- columns, quote(cfName), clause.right, whereStr, pageRowSize));
- }
-
-
- /** remove key columns from the column string */
- private String withoutKeyColumns(String columnString)
- {
- Set<String> keyNames = new HashSet<String>();
- for (BoundColumn column : Iterables.concat(partitionBoundColumns, clusterColumns))
- keyNames.add(column.name);
-
- String[] columns = columnString.split(",");
- String result = null;
- for (String column : columns)
- {
- String trimmed = column.trim();
- if (keyNames.contains(trimmed))
- continue;
-
- String quoted = quote(trimmed);
- result = result == null ? quoted : result + "," + quoted;
- }
- return result;
- }
-
- /** serialize the where clause */
- private Pair<Integer, String> whereClause()
- {
- if (partitionKeyString == null)
- partitionKeyString = keyString(partitionBoundColumns);
-
- if (partitionKeyMarkers == null)
- partitionKeyMarkers = partitionKeyMarkers();
- // initial query token(k) >= start_token and token(k) <= end_token
- if (emptyPartitionKeyValues())
- return Pair.create(0, String.format(" WHERE token(%s) > ? AND token(%s) <= ?", partitionKeyString, partitionKeyString));
-
- // query token(k) > token(pre_partition_key) and token(k) <= end_token
- if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
- return Pair.create(1,
- String.format(" WHERE token(%s) > token(%s) AND token(%s) <= ?",
- partitionKeyString, partitionKeyMarkers, partitionKeyString));
-
- // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
- Pair<Integer, String> clause = whereClause(clusterColumns, 0);
- return Pair.create(clause.left,
- String.format(" WHERE token(%s) = token(%s) %s", partitionKeyString, partitionKeyMarkers, clause.right));
- }
-
- /** recursively serialize the where clause */
- private Pair<Integer, String> whereClause(List<BoundColumn> column, int position)
- {
- if (position == column.size() - 1 || column.get(position + 1).value == null)
- return Pair.create(position + 2, String.format(" AND %s %s ? ", quote(column.get(position).name), column.get(position).reversed ? " < " : " >"));
-
- Pair<Integer, String> clause = whereClause(column, position + 1);
- return Pair.create(clause.left, String.format(" AND %s = ? %s", quote(column.get(position).name), clause.right));
- }
-
- /** check whether all key values are null */
- private boolean emptyPartitionKeyValues()
- {
- for (BoundColumn column : partitionBoundColumns)
- {
- if (column.value != null)
- return false;
- }
- return true;
- }
-
- /** serialize the partition key string in format of <key1>, <key2>, <key3> */
- private String keyString(List<BoundColumn> columns)
- {
- String result = null;
- for (BoundColumn column : columns)
- result = result == null ? quote(column.name) : result + "," + quote(column.name);
-
- return result == null ? "" : result;
- }
-
- /** serialize the question marks for partition key string in format of ?, ? , ? */
- private String partitionKeyMarkers()
- {
- String result = null;
- for (BoundColumn column : partitionBoundColumns)
- result = result == null ? "?" : result + ",?";
-
- return result;
- }
-
- /** serialize the query binding variables, pair.left is query id, pair.right is the binding variables */
- private Pair<Integer, List<ByteBuffer>> preparedQueryBindValues()
- {
- List<ByteBuffer> values = new LinkedList<ByteBuffer>();
-
- // initial query token(k) >= start_token and token(k) <= end_token
- if (emptyPartitionKeyValues())
- {
- values.add(partitioner.getTokenValidator().fromString(split.getStartToken()));
- values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
- return Pair.create(0, values);
- }
- else
- {
- for (BoundColumn partitionBoundColumn1 : partitionBoundColumns)
- values.add(partitionBoundColumn1.value);
-
- if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
- {
- // query token(k) > token(pre_partition_key) and token(k) <= end_token
- values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
- return Pair.create(1, values);
- }
- else
- {
- // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
- int type = preparedQueryBindValues(clusterColumns, 0, values);
- return Pair.create(type, values);
- }
- }
- }
-
- /** recursively serialize the query binding variables */
- private int preparedQueryBindValues(List<BoundColumn> column, int position, List<ByteBuffer> bindValues)
- {
- if (position == column.size() - 1 || column.get(position + 1).value == null)
- {
- bindValues.add(column.get(position).value);
- return position + 2;
- }
- else
- {
- bindValues.add(column.get(position).value);
- return preparedQueryBindValues(column, position + 1, bindValues);
- }
- }
-
- /** get the prepared query item Id */
- private int prepareQuery(int type) throws InvalidRequestException, TException
- {
- Integer itemId = preparedQueryIds.get(type);
- if (itemId != null)
- return itemId;
-
- Pair<Integer, String> query = null;
- query = composeQuery(columns);
- logger.debug("type: {}, query: {}", query.left, query.right);
- CqlPreparedResult cqlPreparedResult = client.prepare_cql3_query(ByteBufferUtil.bytes(query.right), Compression.NONE);
- preparedQueryIds.put(query.left, cqlPreparedResult.itemId);
- return cqlPreparedResult.itemId;
- }
-
- /** Quoting for working with uppercase */
- private String quote(String identifier)
- {
- return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
- }
-
- /** execute the prepared query */
- private void executeQuery()
- {
- Pair<Integer, List<ByteBuffer>> bindValues = preparedQueryBindValues();
- logger.debug("query type: {}", bindValues.left);
-
- // check whether it reach end of range for type 1 query CASSANDRA-5573
- if (bindValues.left == 1 && reachEndRange())
- {
- rows = null;
- return;
- }
-
- int retries = 0;
- // only try three times for TimedOutException and UnavailableException
- while (retries < 3)
- {
- try
- {
- CqlResult cqlResult = client.execute_prepared_cql3_query(prepareQuery(bindValues.left), bindValues.right, consistencyLevel);
- if (cqlResult != null && cqlResult.rows != null)
- rows = cqlResult.rows.iterator();
- return;
- }
- catch (TimedOutException e)
- {
- retries++;
- if (retries >= 3)
- {
- rows = null;
- RuntimeException rte = new RuntimeException(e.getMessage());
- rte.initCause(e);
- throw rte;
- }
- }
- catch (UnavailableException e)
- {
- retries++;
- if (retries >= 3)
- {
- rows = null;
- RuntimeException rte = new RuntimeException(e.getMessage());
- rte.initCause(e);
- throw rte;
- }
- }
- catch (Exception e)
- {
- rows = null;
- RuntimeException rte = new RuntimeException(e.getMessage());
- rte.initCause(e);
- throw rte;
- }
- }
- }
- }
-
- /** retrieve the partition keys and cluster keys from system.schema_columnfamilies table */
- private void retrieveKeys() throws Exception
- {
- String query = "select key_aliases," +
- "column_aliases, " +
- "key_validator, " +
- "comparator " +
- "from system.schema_columnfamilies " +
- "where keyspace_name='%s' and columnfamily_name='%s'";
- String formatted = String.format(query, keyspace, cfName);
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
-
- CqlRow cqlRow = result.rows.get(0);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
- logger.debug("partition keys: {}", keyString);
- List<String> keys = FBUtilities.fromJsonList(keyString);
-
- for (String key : keys)
- partitionBoundColumns.add(new BoundColumn(key));
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
- logger.debug("cluster columns: {}", keyString);
- keys = FBUtilities.fromJsonList(keyString);
-
- for (String key : keys)
- clusterColumns.add(new BoundColumn(key));
-
- Column rawKeyValidator = cqlRow.columns.get(2);
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
- logger.debug("row key validator: {}", validator);
- keyValidator = parseType(validator);
-
- if (keyValidator instanceof CompositeType)
- {
- List<AbstractType<?>> types = ((CompositeType) keyValidator).types;
- for (int i = 0; i < partitionBoundColumns.size(); i++)
- partitionBoundColumns.get(i).validator = types.get(i);
- }
- else
- {
- partitionBoundColumns.get(0).validator = keyValidator;
- }
-
- Column rawComparator = cqlRow.columns.get(3);
- String comparator = ByteBufferUtil.string(ByteBuffer.wrap(rawComparator.getValue()));
- logger.debug("comparator: {}", comparator);
- AbstractType comparatorValidator = parseType(comparator);
- if (comparatorValidator instanceof CompositeType)
- {
- for (int i = 0; i < clusterColumns.size(); i++)
- clusterColumns.get(i).reversed = (((CompositeType) comparatorValidator).types.get(i) instanceof ReversedType);
- }
- else if (comparatorValidator instanceof ReversedType)
- {
- clusterColumns.get(0).reversed = true;
- }
- }
-
- /** check whether current row is at the end of range */
- private boolean reachEndRange()
- {
- // current row key
- ByteBuffer rowKey;
- if (keyValidator instanceof CompositeType)
- {
- ByteBuffer[] keys = new ByteBuffer[partitionBoundColumns.size()];
- for (int i = 0; i < partitionBoundColumns.size(); i++)
- keys[i] = partitionBoundColumns.get(i).value.duplicate();
-
- rowKey = CompositeType.build(keys);
- }
- else
- {
- rowKey = partitionBoundColumns.get(0).value;
- }
-
- String endToken = split.getEndToken();
- String currentToken = partitioner.getToken(rowKey).toString();
- logger.debug("End token: {}, current token: {}", endToken, currentToken);
-
- return endToken.equals(currentToken);
- }
-
- private static AbstractType<?> parseType(String type) throws IOException
- {
- try
- {
- // always treat counters like longs, specifically CCT.serialize is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- private static class BoundColumn
- {
- final String name;
- ByteBuffer value;
- AbstractType<?> validator;
- boolean reversed = false;
-
- public BoundColumn(String name)
- {
- this.name = name;
- }
- }
-
- /** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/
- private static String stringValue(ByteBuffer value)
- {
- try
- {
- return ByteBufferUtil.string(value);
- }
- catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
- }
-}