You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by db...@apache.org on 2013/06/07 00:16:14 UTC
[1/5] git commit: cleanup
Updated Branches:
refs/heads/trunk 2e9f51d16 -> 6b16c1a57
cleanup
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff5b8304
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff5b8304
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff5b8304
Branch: refs/heads/trunk
Commit: ff5b8304431477eb82fc13ab1344a2fc92c66a57
Parents: fd6d19b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 6 16:47:03 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jun 6 17:00:55 2013 -0500
----------------------------------------------------------------------
.../hadoop/cql3/ColumnFamilyRecordWriter.java | 43 +++++++--------
1 files changed, 20 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5b8304/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
index 3939e0b..c32d9ef 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
@@ -152,16 +152,16 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
* (i.e., null), then the entire key is marked for {@link Deletion}.
* </p>
*
- * @param keybuff
+ * @param keyColumns
* the key to write.
* @param values
* the values to write.
* @throws IOException
*/
@Override
- public void write(Map<String, ByteBuffer> keys, List<ByteBuffer> values) throws IOException
+ public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
{
- ByteBuffer rowKey = getRowKey(keys);
+ ByteBuffer rowKey = getRowKey(keyColumns);
Range<Token> range = ringCache.getRange(rowKey);
// get the client for the given range, or create a new one
@@ -180,7 +180,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
/**
* A client that runs in a threadpool and connects to the list of endpoints for a particular
- * range. Binded variable values for keys in that range are sent to this client via a queue.
+ * range. Bound variables for keys in that range are sent to this client via a queue.
*/
public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
{
@@ -201,10 +201,10 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
outer:
while (run || !queue.isEmpty())
{
- Pair<ByteBuffer, List<ByteBuffer>> bindVariables;
+ Pair<ByteBuffer, List<ByteBuffer>> item;
try
{
- bindVariables = queue.take();
+ item = queue.take();
}
catch (InterruptedException e)
{
@@ -220,15 +220,16 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
{
int i = 0;
int itemId = preparedStatement(client);
- while (bindVariables != null)
+ while (item != null)
{
- client.execute_prepared_cql3_query(itemId, bindVariables.right, ConsistencyLevel.ONE);
+ List<ByteBuffer> bindVariables = item.right;
+ client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
i++;
if (i >= batchThreshold)
break;
- bindVariables = queue.poll();
+ item = queue.poll();
}
break;
@@ -293,7 +294,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
}
}
- private ByteBuffer getRowKey(Map<String, ByteBuffer> keysMap)
+ private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
{
//current row key
ByteBuffer rowKey;
@@ -301,13 +302,13 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
{
ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
for (int i = 0; i< keys.length; i++)
- keys[i] = keysMap.get(partitionkeys[i]);
+ keys[i] = keyColumns.get(partitionkeys[i]);
rowKey = ((CompositeType) keyValidator).build(keys);
}
else
{
- rowKey = keysMap.get(partitionkeys[0]);
+ rowKey = keyColumns.get(partitionkeys[0]);
}
return rowKey;
}
@@ -331,11 +332,11 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
Column rawPartitionKeys = result.rows.get(0).columns.get(1);
String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
logger.debug("partition keys: " + keyString);
-
+
List<String> keys = FBUtilities.fromJsonList(keyString);
- partitionkeys = new String [keys.size()];
- int i=0;
- for (String key: keys)
+ partitionkeys = new String[keys.size()];
+ int i = 0;
+ for (String key : keys)
{
partitionkeys[i] = key;
i++;
@@ -372,13 +373,9 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma
}
finally
{
- if (client != null)
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- client = null;
- }
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
}
throw new IOException("There are no endpoints");
}
[4/5] git commit: fix capitalization of file name
Posted by db...@apache.org.
fix capitalization of file name
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60b3584e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60b3584e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60b3584e
Branch: refs/heads/trunk
Commit: 60b3584e01773a14984a8e4433307205c4e32ddc
Parents: a004779
Author: Dave Brosius <db...@apache.org>
Authored: Thu Jun 6 18:15:08 2013 -0400
Committer: Dave Brosius <db...@apache.org>
Committed: Thu Jun 6 18:15:08 2013 -0400
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CQLConfigHelper.java | 109 ---------------
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 109 +++++++++++++++
2 files changed, 109 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60b3584e/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
deleted file mode 100644
index cb61d05..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.cassandra.hadoop.cql3;
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-import org.apache.hadoop.conf.Configuration;
-
-public class CqlConfigHelper
-{
- private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
- private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
- private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
- private static final String OUTPUT_CQL = "cassandra.output.cql";
-
- /**
- * Set the CQL columns for the input of this job.
- *
- * @param conf Job configuration you are about to run
- * @param columns
- */
- public static void setInputColumns(Configuration conf, String columns)
- {
- if (columns == null || columns.isEmpty())
- return;
-
- conf.set(INPUT_CQL_COLUMNS_CONFIG, columns);
- }
-
- /**
- * Set the CQL query Limit for the input of this job.
- *
- * @param conf Job configuration you are about to run
- * @param cqlPageRowSize
- */
- public static void setInputCQLPageRowSize(Configuration conf, String cqlPageRowSize)
- {
- if (cqlPageRowSize == null)
- {
- throw new UnsupportedOperationException("cql page row size may not be null");
- }
-
- conf.set(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, cqlPageRowSize);
- }
-
- /**
- * Set the CQL user defined where clauses for the input of this job.
- *
- * @param conf Job configuration you are about to run
- * @param clauses
- */
- public static void setInputWhereClauses(Configuration conf, String clauses)
- {
- if (clauses == null || clauses.isEmpty())
- return;
-
- conf.set(INPUT_CQL_WHERE_CLAUSE_CONFIG, clauses);
- }
-
- /**
- * Set the CQL prepared statement for the output of this job.
- *
- * @param conf Job configuration you are about to run
- * @param cql
- */
- public static void setOutputCql(Configuration conf, String cql)
- {
- if (cql == null || cql.isEmpty())
- return;
-
- conf.set(OUTPUT_CQL, cql);
- }
-
-
- public static String getInputcolumns(Configuration conf)
- {
- return conf.get(INPUT_CQL_COLUMNS_CONFIG);
- }
-
- public static String getInputPageRowSize(Configuration conf)
- {
- return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG);
- }
-
- public static String getInputWhereClauses(Configuration conf)
- {
- return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG);
- }
-
- public static String getOutputCql(Configuration conf)
- {
- return conf.get(OUTPUT_CQL);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60b3584e/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
new file mode 100644
index 0000000..cb61d05
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -0,0 +1,109 @@
+package org.apache.cassandra.hadoop.cql3;
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+import org.apache.hadoop.conf.Configuration;
+
+public class CqlConfigHelper
+{
+ private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+ private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
+ private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
+ private static final String OUTPUT_CQL = "cassandra.output.cql";
+
+ /**
+ * Set the CQL columns for the input of this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param columns
+ */
+ public static void setInputColumns(Configuration conf, String columns)
+ {
+ if (columns == null || columns.isEmpty())
+ return;
+
+ conf.set(INPUT_CQL_COLUMNS_CONFIG, columns);
+ }
+
+ /**
+ * Set the CQL query Limit for the input of this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param cqlPageRowSize
+ */
+ public static void setInputCQLPageRowSize(Configuration conf, String cqlPageRowSize)
+ {
+ if (cqlPageRowSize == null)
+ {
+ throw new UnsupportedOperationException("cql page row size may not be null");
+ }
+
+ conf.set(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, cqlPageRowSize);
+ }
+
+ /**
+ * Set the CQL user defined where clauses for the input of this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param clauses
+ */
+ public static void setInputWhereClauses(Configuration conf, String clauses)
+ {
+ if (clauses == null || clauses.isEmpty())
+ return;
+
+ conf.set(INPUT_CQL_WHERE_CLAUSE_CONFIG, clauses);
+ }
+
+ /**
+ * Set the CQL prepared statement for the output of this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param cql
+ */
+ public static void setOutputCql(Configuration conf, String cql)
+ {
+ if (cql == null || cql.isEmpty())
+ return;
+
+ conf.set(OUTPUT_CQL, cql);
+ }
+
+
+ public static String getInputcolumns(Configuration conf)
+ {
+ return conf.get(INPUT_CQL_COLUMNS_CONFIG);
+ }
+
+ public static String getInputPageRowSize(Configuration conf)
+ {
+ return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG);
+ }
+
+ public static String getInputWhereClauses(Configuration conf)
+ {
+ return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG);
+ }
+
+ public static String getOutputCql(Configuration conf)
+ {
+ return conf.get(OUTPUT_CQL);
+ }
+}
[2/5] rename
Posted by db...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
new file mode 100644
index 0000000..3a0f628
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Hadoop RecordReader read the values return from the CQL query
+ * It use CQL key range query to page through the wide rows.
+ * <p/>
+ * Return List<IColumn> as keys columns
+ * <p/>
+ * Map<ByteBuffer, IColumn> as column name to columns mappings
+ */
+public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
+ implements org.apache.hadoop.mapred.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(CqlPagingRecordReader.class);
+
+ public static final int DEFAULT_CQL_PAGE_LIMIT = 1000; // TODO: find the number large enough but not OOM
+
+ private ColumnFamilySplit split;
+ private RowIterator rowIterator;
+
+ private Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> currentRow;
+ private int totalRowCount; // total number of rows to fetch
+ private String keyspace;
+ private String cfName;
+ private Cassandra.Client client;
+ private ConsistencyLevel consistencyLevel;
+
+ // partition keys -- key aliases
+ private List<BoundColumn> partitionBoundColumns = new ArrayList<BoundColumn>();
+
+ // cluster keys -- column aliases
+ private List<BoundColumn> clusterColumns = new ArrayList<BoundColumn>();
+
+ // map prepared query type to item id
+ private Map<Integer, Integer> preparedQueryIds = new HashMap<Integer, Integer>();
+
+ // cql query select columns
+ private String columns;
+
+ // the number of cql rows per page
+ private int pageRowSize;
+
+ // user defined where clauses
+ private String userDefinedWhereClauses;
+
+ private IPartitioner partitioner;
+
+ private AbstractType<?> keyValidator;
+
+ public CqlPagingRecordReader()
+ {
+ super();
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
+ {
+ this.split = (ColumnFamilySplit) split;
+ Configuration conf = context.getConfiguration();
+ totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
+ ? (int) this.split.getLength()
+ : ConfigHelper.getInputSplitSize(conf);
+ cfName = ConfigHelper.getInputColumnFamily(conf);
+ consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
+ keyspace = ConfigHelper.getInputKeyspace(conf);
+ columns = CqlConfigHelper.getInputcolumns(conf);
+ userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+
+ try
+ {
+ pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf));
+ }
+ catch (NumberFormatException e)
+ {
+ pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+ }
+
+ partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+
+ try
+ {
+ if (client != null)
+ return;
+
+ // create connection using thrift
+ String location = getLocation();
+
+ int port = ConfigHelper.getInputRpcPort(conf);
+ client = CqlPagingInputFormat.createAuthenticatedClient(location, port, conf);
+
+ // retrieve partition keys and cluster keys from system.schema_columnfamilies table
+ retrieveKeys();
+
+ client.set_keyspace(keyspace);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ rowIterator = new RowIterator();
+
+ logger.debug("created {}", rowIterator);
+ }
+
+ public void close()
+ {
+ if (client != null)
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ client = null;
+ }
+ }
+
+ public Map<String, ByteBuffer> getCurrentKey()
+ {
+ return currentRow.left;
+ }
+
+ public Map<String, ByteBuffer> getCurrentValue()
+ {
+ return currentRow.right;
+ }
+
+ public float getProgress()
+ {
+ if (!rowIterator.hasNext())
+ return 1.0F;
+
+ // the progress is likely to be reported slightly off the actual but close enough
+ float progress = ((float) rowIterator.totalRead / totalRowCount);
+ return progress > 1.0F ? 1.0F : progress;
+ }
+
+ public boolean nextKeyValue() throws IOException
+ {
+ if (!rowIterator.hasNext())
+ {
+ logger.debug("Finished scanning " + rowIterator.totalRead + " rows (estimate was: " + totalRowCount + ")");
+ return false;
+ }
+
+ try
+ {
+ currentRow = rowIterator.next();
+ }
+ catch (Exception e)
+ {
+ // throw it as IOException, so client can catch it and handle it at client side
+ IOException ioe = new IOException(e.getMessage());
+ ioe.initCause(ioe.getCause());
+ throw ioe;
+ }
+ return true;
+ }
+
+ // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+ // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
+ private String getLocation()
+ {
+ Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses();
+
+ for (InetAddress address : localAddresses)
+ {
+ for (String location : split.getLocations())
+ {
+ InetAddress locationAddress;
+ try
+ {
+ locationAddress = InetAddress.getByName(location);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ if (address.equals(locationAddress))
+ {
+ return location;
+ }
+ }
+ }
+ return split.getLocations()[0];
+ }
+
+ // Because the old Hadoop API wants us to write to the key and value
+ // and the new asks for them, we need to copy the output of the new API
+ // to the old. Thus, expect a small performance hit.
+ // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
+ // and ColumnFamilyRecordReader don't support them, it should be fine for now.
+ public boolean next(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> value) throws IOException
+ {
+ if (nextKeyValue())
+ {
+ value.clear();
+ value.putAll(getCurrentValue());
+
+ keys.clear();
+ keys.putAll(getCurrentKey());
+
+ return true;
+ }
+ return false;
+ }
+
+ public long getPos() throws IOException
+ {
+ return (long) rowIterator.totalRead;
+ }
+
+ public Map<String, ByteBuffer> createKey()
+ {
+ return new LinkedHashMap<String, ByteBuffer>();
+ }
+
+ public Map<String, ByteBuffer> createValue()
+ {
+ return new LinkedHashMap<String, ByteBuffer>();
+ }
+
+ /** CQL row iterator */
+ private class RowIterator extends AbstractIterator<Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>>>
+ {
+ protected int totalRead = 0; // total number of cf rows read
+ protected Iterator<CqlRow> rows;
+ private int pageRows = 0; // the number of cql rows read of this page
+ private String previousRowKey = null; // previous CF row key
+ private String partitionKeyString; // keys in <key1>, <key2>, <key3> string format
+ private String partitionKeyMarkers; // question marks in ? , ? , ? format which matches the number of keys
+
+ public RowIterator()
+ {
+ // initial page
+ executeQuery();
+ }
+
+ protected Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> computeNext()
+ {
+ if (rows == null)
+ return endOfData();
+
+ int index = -2;
+ //check there are more page to read
+ while (!rows.hasNext())
+ {
+ // no more data
+ if (index == -1 || emptyPartitionKeyValues())
+ {
+ logger.debug("no more data.");
+ return endOfData();
+ }
+
+ index = setTailNull(clusterColumns);
+ logger.debug("set tail to null, index: " + index);
+ executeQuery();
+ pageRows = 0;
+
+ if (rows == null || !rows.hasNext() && index < 0)
+ {
+ logger.debug("no more data.");
+ return endOfData();
+ }
+ }
+
+ Map<String, ByteBuffer> valueColumns = createValue();
+ Map<String, ByteBuffer> keyColumns = createKey();
+ int i = 0;
+ CqlRow row = rows.next();
+ for (Column column : row.columns)
+ {
+ String columnName = stringValue(ByteBuffer.wrap(column.getName()));
+ logger.debug("column: " + columnName);
+
+ if (i < partitionBoundColumns.size() + clusterColumns.size())
+ keyColumns.put(stringValue(column.name), column.value);
+ else
+ valueColumns.put(stringValue(column.name), column.value);
+
+ i++;
+ }
+
+ // increase total CQL row read for this page
+ pageRows++;
+
+ // increase total CF row read
+ if (newRow(keyColumns, previousRowKey))
+ totalRead++;
+
+ // read full page
+ if (pageRows >= pageRowSize || !rows.hasNext())
+ {
+ Iterator<String> newKeys = keyColumns.keySet().iterator();
+ for (BoundColumn column : partitionBoundColumns)
+ column.value = keyColumns.get(newKeys.next());
+
+ for (BoundColumn column : clusterColumns)
+ column.value = keyColumns.get(newKeys.next());
+
+ executeQuery();
+ pageRows = 0;
+ }
+
+ return Pair.create(keyColumns, valueColumns);
+ }
+
+ /** check whether start to read a new CF row by comparing the partition keys */
+ private boolean newRow(Map<String, ByteBuffer> keyColumns, String previousRowKey)
+ {
+ if (keyColumns.isEmpty())
+ return false;
+
+ String rowKey = "";
+ if (keyColumns.size() == 1)
+ {
+ rowKey = partitionBoundColumns.get(0).validator.getString(keyColumns.get(partitionBoundColumns.get(0).name));
+ }
+ else
+ {
+ Iterator<ByteBuffer> iter = keyColumns.values().iterator();
+ for (BoundColumn column : partitionBoundColumns)
+ rowKey = rowKey + column.validator.getString(ByteBufferUtil.clone(iter.next())) + ":";
+ }
+
+ logger.debug("previous RowKey: " + previousRowKey + ", new row key: " + rowKey);
+ if (previousRowKey == null)
+ {
+ this.previousRowKey = rowKey;
+ return true;
+ }
+
+ if (rowKey.equals(previousRowKey))
+ return false;
+
+ this.previousRowKey = rowKey;
+ return true;
+ }
+
+ /** set the last non-null key value to null, and return the previous index */
+ private int setTailNull(List<BoundColumn> values)
+ {
+ if (values.isEmpty())
+ return -1;
+
+ Iterator<BoundColumn> iterator = values.iterator();
+ int previousIndex = -1;
+ BoundColumn current;
+ while (iterator.hasNext())
+ {
+ current = iterator.next();
+ if (current.value == null)
+ {
+ int index = previousIndex > 0 ? previousIndex : 0;
+ BoundColumn column = values.get(index);
+ logger.debug("set key " + column.name + " value to null");
+ column.value = null;
+ return previousIndex - 1;
+ }
+
+ previousIndex++;
+ }
+
+ BoundColumn column = values.get(previousIndex);
+ logger.debug("set key " + column.name + " value to null");
+ column.value = null;
+ return previousIndex - 1;
+ }
+
+ /** compose the prepared query, pair.left is query id, pair.right is query */
+ private Pair<Integer, String> composeQuery(String columns)
+ {
+ Pair<Integer, String> clause = whereClause();
+ if (columns == null)
+ {
+ columns = "*";
+ }
+ else
+ {
+ // add keys in the front in order
+ String partitionKey = keyString(partitionBoundColumns);
+ String clusterKey = keyString(clusterColumns);
+
+ columns = withoutKeyColumns(columns);
+ columns = (clusterKey == null || "".equals(clusterKey))
+ ? partitionKey + "," + columns
+ : partitionKey + "," + clusterKey + "," + columns;
+ }
+
+ return Pair.create(clause.left,
+ "SELECT " + columns
+ + " FROM " + cfName
+ + clause.right
+ + (userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses)
+ + " LIMIT " + pageRowSize
+ + " ALLOW FILTERING");
+ }
+
+
+ /** remove key columns from the column string */
+ private String withoutKeyColumns(String columnString)
+ {
+ Set<String> keyNames = new HashSet<String>();
+ for (BoundColumn column : Iterables.concat(partitionBoundColumns, clusterColumns))
+ keyNames.add(column.name);
+
+ String[] columns = columnString.split(",");
+ String result = null;
+ for (String column : columns)
+ {
+ String trimmed = column.trim();
+ if (keyNames.contains(trimmed))
+ continue;
+
+ result = result == null ? trimmed : result + "," + trimmed;
+ }
+ return result;
+ }
+
+ /** compose the where clause */
+ private Pair<Integer, String> whereClause()
+ {
+ if (partitionKeyString == null)
+ partitionKeyString = keyString(partitionBoundColumns);
+
+ if (partitionKeyMarkers == null)
+ partitionKeyMarkers = partitionKeyMarkers();
+ // initial query token(k) >= start_token and token(k) <= end_token
+ if (emptyPartitionKeyValues())
+ return Pair.create(0, " WHERE token(" + partitionKeyString + ") > ? AND token(" + partitionKeyString + ") <= ?");
+
+ // query token(k) > token(pre_partition_key) and token(k) <= end_token
+ if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
+ return Pair.create(1,
+ " WHERE token(" + partitionKeyString + ") > token(" + partitionKeyMarkers + ") "
+ + " AND token(" + partitionKeyString + ") <= ?");
+
+ // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
+ Pair<Integer, String> clause = whereClause(clusterColumns, 0);
+ return Pair.create(clause.left,
+ " WHERE token(" + partitionKeyString + ") = token(" + partitionKeyMarkers + ") " + clause.right);
+ }
+
+ /** recursively compose the where clause */
+ private Pair<Integer, String> whereClause(List<BoundColumn> column, int position)
+ {
+ if (position == column.size() - 1 || column.get(position + 1).value == null)
+ return Pair.create(position + 2, " AND " + column.get(position).name + " > ? ");
+
+ Pair<Integer, String> clause = whereClause(column, position + 1);
+ return Pair.create(clause.left, " AND " + column.get(position).name + " = ? " + clause.right);
+ }
+
+ /** check whether all key values are null */
+ private boolean emptyPartitionKeyValues()
+ {
+ for (BoundColumn column : partitionBoundColumns)
+ {
+ if (column.value != null)
+ return false;
+ }
+ return true;
+ }
+
+ /** compose the partition key string in format of <key1>, <key2>, <key3> */
+ private String keyString(List<BoundColumn> columns)
+ {
+ String result = null;
+ for (BoundColumn column : columns)
+ result = result == null ? column.name : result + "," + column.name;
+
+ return result == null ? "" : result;
+ }
+
+ /** compose the question marks for partition key string in format of ?, ? , ? */
+ private String partitionKeyMarkers()
+ {
+ String result = null;
+ for (BoundColumn column : partitionBoundColumns)
+ result = result == null ? "?" : result + ",?";
+
+ return result;
+ }
+
+ /** compose the query binding variables, pair.left is query id, pair.right is the binding variables */
+ private Pair<Integer, List<ByteBuffer>> preparedQueryBindValues()
+ {
+ List<ByteBuffer> values = new LinkedList<ByteBuffer>();
+
+ // initial query token(k) >= start_token and token(k) <= end_token
+ if (emptyPartitionKeyValues())
+ {
+ values.add(partitioner.getTokenValidator().fromString(split.getStartToken()));
+ values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
+ return Pair.create(0, values);
+ }
+ else
+ {
+ for (BoundColumn partitionBoundColumn1 : partitionBoundColumns)
+ values.add(partitionBoundColumn1.value);
+
+ if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
+ {
+ // query token(k) > token(pre_partition_key) and token(k) <= end_token
+ values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
+ return Pair.create(1, values);
+ }
+ else
+ {
+ // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
+ int type = preparedQueryBindValues(clusterColumns, 0, values);
+ return Pair.create(type, values);
+ }
+ }
+ }
+
+ /** recursively compose the query binding variables */
+ private int preparedQueryBindValues(List<BoundColumn> column, int position, List<ByteBuffer> bindValues)
+ {
+ if (position == column.size() - 1 || column.get(position + 1).value == null)
+ {
+ bindValues.add(column.get(position).value);
+ return position + 2;
+ }
+ else
+ {
+ bindValues.add(column.get(position).value);
+ return preparedQueryBindValues(column, position + 1, bindValues);
+ }
+ }
+
+ /** get the prepared query item Id */
+ private int prepareQuery(int type) throws InvalidRequestException, TException
+ {
+ Integer itemId = preparedQueryIds.get(type);
+ if (itemId != null)
+ return itemId;
+
+ Pair<Integer, String> query = null;
+ query = composeQuery(columns);
+ logger.debug("type:" + query.left + ", query: " + query.right);
+ CqlPreparedResult cqlPreparedResult = client.prepare_cql3_query(ByteBufferUtil.bytes(query.right), Compression.NONE);
+ preparedQueryIds.put(query.left, cqlPreparedResult.itemId);
+ return cqlPreparedResult.itemId;
+ }
+
+ /** execute the prepared query */
+ private void executeQuery()
+ {
+ Pair<Integer, List<ByteBuffer>> bindValues = preparedQueryBindValues();
+ logger.debug("query type: " + bindValues.left);
+
+ // check whether it reach end of range for type 1 query CASSANDRA-5573
+ if (bindValues.left == 1 && reachEndRange())
+ {
+ rows = null;
+ return;
+ }
+
+ int retries = 0;
+ // only try three times for TimedOutException and UnavailableException
+ while (retries < 3)
+ {
+ try
+ {
+ CqlResult cqlResult = client.execute_prepared_cql3_query(prepareQuery(bindValues.left), bindValues.right, consistencyLevel);
+ if (cqlResult != null && cqlResult.rows != null)
+ rows = cqlResult.rows.iterator();
+ return;
+ }
+ catch (TimedOutException e)
+ {
+ retries++;
+ if (retries >= 3)
+ {
+ rows = null;
+ RuntimeException rte = new RuntimeException(e.getMessage());
+ rte.initCause(e);
+ throw rte;
+ }
+ }
+ catch (UnavailableException e)
+ {
+ retries++;
+ if (retries >= 3)
+ {
+ rows = null;
+ RuntimeException rte = new RuntimeException(e.getMessage());
+ rte.initCause(e);
+ throw rte;
+ }
+ }
+ catch (Exception e)
+ {
+ rows = null;
+ RuntimeException rte = new RuntimeException(e.getMessage());
+ rte.initCause(e);
+ throw rte;
+ }
+ }
+ }
+ }
+
+ /** retrieve the partition keys and cluster keys from system.schema_columnfamilies table */
+ private void retrieveKeys() throws Exception
+ {
+ String query = "select key_aliases," +
+ "column_aliases, " +
+ "key_validator, " +
+ "comparator " +
+ "from system.schema_columnfamilies " +
+ "where keyspace_name='%s' and columnfamily_name='%s'";
+ String formatted = String.format(query, keyspace, cfName);
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
+
+ CqlRow cqlRow = result.rows.get(0);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+ logger.debug("partition keys: " + keyString);
+ List<String> keys = FBUtilities.fromJsonList(keyString);
+
+ for (String key : keys)
+ partitionBoundColumns.add(new BoundColumn(key));
+
+ keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+ logger.debug("cluster columns: " + keyString);
+ keys = FBUtilities.fromJsonList(keyString);
+
+ for (String key : keys)
+ clusterColumns.add(new BoundColumn(key));
+
+ Column rawKeyValidator = cqlRow.columns.get(2);
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
+ logger.debug("row key validator: " + validator);
+ keyValidator = parseType(validator);
+
+ if (keyValidator instanceof CompositeType)
+ {
+ List<AbstractType<?>> types = ((CompositeType) keyValidator).types;
+ for (int i = 0; i < partitionBoundColumns.size(); i++)
+ partitionBoundColumns.get(i).validator = types.get(i);
+ }
+ else
+ {
+ partitionBoundColumns.get(0).validator = keyValidator;
+ }
+ }
+
+ /** check whether current row is at the end of range */
+ private boolean reachEndRange()
+ {
+ // current row key
+ ByteBuffer rowKey;
+ if (keyValidator instanceof CompositeType)
+ {
+ ByteBuffer[] keys = new ByteBuffer[partitionBoundColumns.size()];
+ for (int i = 0; i < partitionBoundColumns.size(); i++)
+ keys[i] = partitionBoundColumns.get(i).value.duplicate();
+
+ rowKey = ((CompositeType) keyValidator).build(keys);
+ }
+ else
+ {
+ rowKey = partitionBoundColumns.get(0).value;
+ }
+
+ String endToken = split.getEndToken();
+ String currentToken = partitioner.getToken(rowKey).toString();
+ logger.debug("End token: " + endToken + ", current token: " + currentToken);
+
+ return endToken.equals(currentToken);
+ }
+
+ private static AbstractType<?> parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ private class BoundColumn
+ {
+ final String name;
+ ByteBuffer value;
+ AbstractType<?> validator;
+
+ public BoundColumn(String name)
+ {
+ this.name = name;
+ }
+ }
+
+ /** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/
+ private static String stringValue(ByteBuffer value)
+ {
+ try
+ {
+ return ByteBufferUtil.string(value);
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
new file mode 100644
index 0000000..dde6b1f
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.Progressable;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
+ * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * in the value to the prepared statement, which it associates with the key, and in
+ * turn the responsible endpoint.
+ *
+ * <p>
+ * Furthermore, this writer groups the cql queries by the endpoint responsible for
+ * the rows being affected. This allows the cql queries to be executed in parallel,
+ * directly to a responsible endpoint.
+ * </p>
+ *
+ * @see CqlOutputFormat
+ */
+final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(CqlRecordWriter.class);
+
+ // handles for clients for each range running in the threadpool
+ private final Map<Range, RangeClient> clients;
+
+ // host to prepared statement id mappings
+ private ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
+
+ private final String cql;
+
+ private AbstractType<?> keyValidator;
+ private String [] partitionkeys;
+
+ /**
+ * Upon construction, obtain the map that this writer will use to collect
+ * mutations, and the ring cache for the given keyspace.
+ *
+ * @param context the task attempt context
+ * @throws IOException
+ */
+ CqlRecordWriter(TaskAttemptContext context) throws IOException
+ {
+ this(context.getConfiguration());
+ this.progressable = new Progressable(context);
+ }
+
+ CqlRecordWriter(Configuration conf, Progressable progressable) throws IOException
+ {
+ this(conf);
+ this.progressable = progressable;
+ }
+
+ CqlRecordWriter(Configuration conf) throws IOException
+ {
+ super(conf);
+ this.clients = new HashMap<Range, RangeClient>();
+ cql = CqlConfigHelper.getOutputCql(conf);
+
+ try
+ {
+ String host = getAnyHost();
+ int port = ConfigHelper.getOutputRpcPort(conf);
+ Cassandra.Client client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+ retrievePartitionKeyValidator(client);
+
+ if (client != null)
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ client = null;
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ // close all the clients before throwing anything
+ IOException clientException = null;
+ for (RangeClient client : clients.values())
+ {
+ try
+ {
+ client.close();
+ }
+ catch (IOException e)
+ {
+ clientException = e;
+ }
+ }
+
+ if (clientException != null)
+ throw clientException;
+ }
+
+ /**
+ * If the key is to be associated with a valid value, a mutation is created
+ * for it with the given column family and columns. In the event the value
+ * in the column is missing (i.e., null), then it is marked for
+ * {@link Deletion}. Similarly, if the entire value for a key is missing
+ * (i.e., null), then the entire key is marked for {@link Deletion}.
+ * </p>
+ *
+ * @param keyColumns
+ * the key to write.
+ * @param values
+ * the values to write.
+ * @throws IOException
+ */
+ @Override
+ public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
+ {
+ ByteBuffer rowKey = getRowKey(keyColumns);
+ Range<Token> range = ringCache.getRange(rowKey);
+
+ // get the client for the given range, or create a new one
+ RangeClient client = clients.get(range);
+ if (client == null)
+ {
+ // haven't seen keys for this range: create new client
+ client = new RangeClient(ringCache.getEndpoint(range));
+ client.start();
+ clients.put(range, client);
+ }
+
+ client.put(Pair.create(rowKey, values));
+ progressable.progress();
+ }
+
+ /**
+ * A client that runs in a threadpool and connects to the list of endpoints for a particular
+ * range. Bound variables for keys in that range are sent to this client via a queue.
+ */
+ public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
+ {
+ /**
+ * Constructs an {@link RangeClient} for the given endpoints.
+ * @param endpoints the possible endpoints to execute the mutations on
+ */
+ public RangeClient(List<InetAddress> endpoints)
+ {
+ super(endpoints);
+ }
+
+ /**
+ * Loops collecting cql binded variable values from the queue and sending to Cassandra
+ */
+ public void run()
+ {
+ outer:
+ while (run || !queue.isEmpty())
+ {
+ Pair<ByteBuffer, List<ByteBuffer>> item;
+ try
+ {
+ item = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ // re-check loop condition after interrupt
+ continue;
+ }
+
+ Iterator<InetAddress> iter = endpoints.iterator();
+ while (true)
+ {
+ // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
+ try
+ {
+ int i = 0;
+ int itemId = preparedStatement(client);
+ while (item != null)
+ {
+ List<ByteBuffer> bindVariables = item.right;
+ client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+
+ item = queue.poll();
+ }
+
+ break;
+ }
+ catch (Exception e)
+ {
+ closeInternal();
+ if (!iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
+
+ // attempt to connect to a different endpoint
+ try
+ {
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ int port = ConfigHelper.getOutputRpcPort(conf);
+ client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+ }
+ catch (Exception e)
+ {
+ closeInternal();
+ // TException means something unexpected went wrong to that endpoint, so
+ // we should try again to another. Other exceptions (auth or invalid request) are fatal.
+ if ((!(e instanceof TException)) || !iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
+ }
+ }
+ }
+
+ /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
+ private int preparedStatement(Cassandra.Client client)
+ {
+ Integer itemId = preparedStatements.get(client);
+ if (itemId == null)
+ {
+ CqlPreparedResult result;
+ try
+ {
+ result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException("failed to prepare cql query " + cql, e);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException("failed to prepare cql query " + cql, e);
+ }
+
+ Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
+ itemId = previousId == null ? result.itemId : previousId;
+ }
+ return itemId;
+ }
+ }
+
+ private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
+ {
+ //current row key
+ ByteBuffer rowKey;
+ if (keyValidator instanceof CompositeType)
+ {
+ ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
+ for (int i = 0; i< keys.length; i++)
+ keys[i] = keyColumns.get(partitionkeys[i]);
+
+ rowKey = ((CompositeType) keyValidator).build(keys);
+ }
+ else
+ {
+ rowKey = keyColumns.get(partitionkeys[0]);
+ }
+ return rowKey;
+ }
+
+ /** retrieve the key validator from system.schema_columnfamilies table */
+ private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
+ {
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ String cfName = ConfigHelper.getOutputColumnFamily(conf);
+ String query = "SELECT key_validator," +
+ " key_aliases " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name='%s' and columnfamily_name='%s'";
+ String formatted = String.format(query, keyspace, cfName);
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
+
+ Column rawKeyValidator = result.rows.get(0).columns.get(0);
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
+ keyValidator = parseType(validator);
+
+ Column rawPartitionKeys = result.rows.get(0).columns.get(1);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
+ logger.debug("partition keys: " + keyString);
+
+ List<String> keys = FBUtilities.fromJsonList(keyString);
+ partitionkeys = new String[keys.size()];
+ int i = 0;
+ for (String key : keys)
+ {
+ partitionkeys[i] = key;
+ i++;
+ }
+ }
+
+ private AbstractType<?> parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ private String getAnyHost() throws IOException, InvalidRequestException, TException
+ {
+ Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
+ List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
+ try
+ {
+ for (TokenRange range : ring)
+ return range.endpoints.get(0);
+ }
+ finally
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ }
+ throw new IOException("There are no endpoints");
+ }
+
+}
[5/5] git commit: Merge branch 'cassandra-1.2' into trunk
Posted by db...@apache.org.
Merge branch 'cassandra-1.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b16c1a5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b16c1a5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b16c1a5
Branch: refs/heads/trunk
Commit: 6b16c1a570c608ba1e1dd15a3213e42f21b58c96
Parents: 2e9f51d 60b3584
Author: Dave Brosius <db...@apache.org>
Authored: Thu Jun 6 18:15:40 2013 -0400
Committer: Dave Brosius <db...@apache.org>
Committed: Thu Jun 6 18:15:40 2013 -0400
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CQLConfigHelper.java | 109 ---------------
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 109 +++++++++++++++
2 files changed, 109 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
[3/5] git commit: rename
Posted by db...@apache.org.
rename
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0047797
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0047797
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0047797
Branch: refs/heads/trunk
Commit: a0047797638b9c0d30ebf225e8a00a029e05be0a
Parents: ff5b830
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 6 17:00:08 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jun 6 17:00:56 2013 -0500
----------------------------------------------------------------------
examples/hadoop_cql3_word_count/src/WordCount.java | 18 +-
.../src/WordCountCounters.java | 11 +-
.../cassandra/hadoop/cql3/CQLConfigHelper.java | 2 +-
.../hadoop/cql3/ColumnFamilyInputFormat.java | 83 --
.../hadoop/cql3/ColumnFamilyOutputFormat.java | 78 --
.../hadoop/cql3/ColumnFamilyRecordReader.java | 763 ---------------
.../hadoop/cql3/ColumnFamilyRecordWriter.java | 383 --------
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 78 ++
.../hadoop/cql3/CqlPagingInputFormat.java | 83 ++
.../hadoop/cql3/CqlPagingRecordReader.java | 763 +++++++++++++++
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 383 ++++++++
11 files changed, 1320 insertions(+), 1325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 09dd9e4..611f9c2 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -21,13 +21,12 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.hadoop.cql3.ColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.hadoop.cql3.CQLConfigHelper;
-import org.apache.cassandra.hadoop.cql3.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
@@ -38,7 +37,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -208,28 +206,28 @@ public class WordCount extends Configured implements Tool
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
- job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
+ job.setOutputFormatClass(CqlOutputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
job.getConfiguration().set(PRIMARY_KEY, "word,sum");
String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
" (row_id1, row_id2, word, count_num) " +
" values (?, ?, ?, ?)";
- CQLConfigHelper.setOutputCql(job.getConfiguration(), query);
+ CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
}
- job.setInputFormatClass(ColumnFamilyInputFormat.class);
+ job.setInputFormatClass(CqlPagingInputFormat.class);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
- CQLConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+ CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
//this is the user defined filter clauses, you can comment it out if you want count all titles
- CQLConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
+ CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
job.waitForCompletion(true);
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
index 1cf5539..8454b70 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -24,23 +24,20 @@ import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.cassandra.hadoop.cql3.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.cql3.CQLConfigHelper;
-import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -107,14 +104,14 @@ public class WordCountCounters extends Configured implements Tool
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
- job.setInputFormatClass(ColumnFamilyInputFormat.class);
+ job.setInputFormatClass(CqlPagingInputFormat.class);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);
- CQLConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+ CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
job.waitForCompletion(true);
return 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
index 66bcfdb..cb61d05 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.hadoop.cql3;
*/
import org.apache.hadoop.conf.Configuration;
-public class CQLConfigHelper
+public class CqlConfigHelper
{
private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java
deleted file mode 100644
index 525ed89..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-/**
- * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
- *
- * At minimum, you need to set the KS and CF in your Hadoop job Configuration.
- * The ConfigHelper class is provided to make this
- * simple:
- * ConfigHelper.setInputColumnFamily
- *
- * You can also configure the number of rows per InputSplit with
- * ConfigHelper.setInputSplitSize. The default split size is 64k rows.
- * the number of CQL rows per page
- *
- * the number of CQL rows per page
- * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
- * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
- * query, so you need set it big enough to minimize the network overhead, and also
- * not too big to avoid out of memory issue.
- *
- * the column names of the select CQL query. The default is all columns
- * CQLConfigHelper.setInputColumns
- *
- * the user defined the where clause
- * CQLConfigHelper.setInputWhereClauses. The default is no user defined where clause
- */
-public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
-{
- public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
- throws IOException
- {
- TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
- {
- @Override
- public void progress()
- {
- reporter.progress();
- }
- };
-
- ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader();
- recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
- return recordReader;
- }
-
- @Override
- public org.apache.hadoop.mapreduce.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> createRecordReader(
- org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
- InterruptedException
- {
- return new ColumnFamilyRecordReader();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java
deleted file mode 100644
index 3f6e2af..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.Progressable;
-import org.apache.hadoop.mapreduce.*;
-
-/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
- * OutputFormat that allows reduce tasks to store keys (and corresponding
- * binded variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
- *
- * <p>
- * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
- * prepared statement in your
- * Hadoop job Configuration. The {@link CQLConfigHelper} class, through its
- * {@link ConfigHelper#setOutputPreparedStatement} method, is provided to make this
- * simple.
- * you need to set the Keyspace. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
- * simple.
- * </p>
- *
- * <p>
- * For the sake of performance, this class employs a lazy write-back caching
- * mechanism, where its record writer prepared statement binded variable values
- * created based on the reduce's inputs (in a task-specific map), and periodically
- * makes the changes official by sending a execution of prepared statement request
- * to Cassandra.
- * </p>
- */
-public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
-{
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
- {
- return new ColumnFamilyRecordWriter(job, new Progressable(progress));
- }
-
- /**
- * Get the {@link RecordWriter} for the given task.
- *
- * @param context
- * the information about the current task.
- * @return a {@link RecordWriter} to write the output for the job.
- * @throws IOException
- */
- public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new ColumnFamilyRecordWriter(context);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java
deleted file mode 100644
index 03d7af5..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ColumnFamilySplit;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-
-/**
- * Hadoop RecordReader read the values return from the CQL query
- * It use CQL key range query to page through the wide rows.
- * <p/>
- * Return List<IColumn> as keys columns
- * <p/>
- * Map<ByteBuffer, IColumn> as column name to columns mappings
- */
-public class ColumnFamilyRecordReader extends RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
- implements org.apache.hadoop.mapred.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
-{
- private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
-
- public static final int DEFAULT_CQL_PAGE_LIMIT = 1000; // TODO: find the number large enough but not OOM
-
- private ColumnFamilySplit split;
- private RowIterator rowIterator;
-
- private Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> currentRow;
- private int totalRowCount; // total number of rows to fetch
- private String keyspace;
- private String cfName;
- private Cassandra.Client client;
- private ConsistencyLevel consistencyLevel;
-
- // partition keys -- key aliases
- private List<BoundColumn> partitionBoundColumns = new ArrayList<BoundColumn>();
-
- // cluster keys -- column aliases
- private List<BoundColumn> clusterColumns = new ArrayList<BoundColumn>();
-
- // map prepared query type to item id
- private Map<Integer, Integer> preparedQueryIds = new HashMap<Integer, Integer>();
-
- // cql query select columns
- private String columns;
-
- // the number of cql rows per page
- private int pageRowSize;
-
- // user defined where clauses
- private String userDefinedWhereClauses;
-
- private IPartitioner partitioner;
-
- private AbstractType<?> keyValidator;
-
- public ColumnFamilyRecordReader()
- {
- super();
- }
-
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
- {
- this.split = (ColumnFamilySplit) split;
- Configuration conf = context.getConfiguration();
- totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
- ? (int) this.split.getLength()
- : ConfigHelper.getInputSplitSize(conf);
- cfName = ConfigHelper.getInputColumnFamily(conf);
- consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
- keyspace = ConfigHelper.getInputKeyspace(conf);
- columns = CQLConfigHelper.getInputcolumns(conf);
- userDefinedWhereClauses = CQLConfigHelper.getInputWhereClauses(conf);
-
- try
- {
- pageRowSize = Integer.parseInt(CQLConfigHelper.getInputPageRowSize(conf));
- }
- catch (NumberFormatException e)
- {
- pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
- }
-
- partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
-
- try
- {
- if (client != null)
- return;
-
- // create connection using thrift
- String location = getLocation();
-
- int port = ConfigHelper.getInputRpcPort(conf);
- client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf);
-
- // retrieve partition keys and cluster keys from system.schema_columnfamilies table
- retrieveKeys();
-
- client.set_keyspace(keyspace);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- rowIterator = new RowIterator();
-
- logger.debug("created {}", rowIterator);
- }
-
- public void close()
- {
- if (client != null)
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- client = null;
- }
- }
-
- public Map<String, ByteBuffer> getCurrentKey()
- {
- return currentRow.left;
- }
-
- public Map<String, ByteBuffer> getCurrentValue()
- {
- return currentRow.right;
- }
-
- public float getProgress()
- {
- if (!rowIterator.hasNext())
- return 1.0F;
-
- // the progress is likely to be reported slightly off the actual but close enough
- float progress = ((float) rowIterator.totalRead / totalRowCount);
- return progress > 1.0F ? 1.0F : progress;
- }
-
- public boolean nextKeyValue() throws IOException
- {
- if (!rowIterator.hasNext())
- {
- logger.debug("Finished scanning " + rowIterator.totalRead + " rows (estimate was: " + totalRowCount + ")");
- return false;
- }
-
- try
- {
- currentRow = rowIterator.next();
- }
- catch (Exception e)
- {
- // throw it as IOException, so client can catch it and handle it at client side
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(ioe.getCause());
- throw ioe;
- }
- return true;
- }
-
- // we don't use endpointsnitch since we are trying to support hadoop nodes that are
- // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
- private String getLocation()
- {
- Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses();
-
- for (InetAddress address : localAddresses)
- {
- for (String location : split.getLocations())
- {
- InetAddress locationAddress;
- try
- {
- locationAddress = InetAddress.getByName(location);
- }
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
- if (address.equals(locationAddress))
- {
- return location;
- }
- }
- }
- return split.getLocations()[0];
- }
-
- // Because the old Hadoop API wants us to write to the key and value
- // and the new asks for them, we need to copy the output of the new API
- // to the old. Thus, expect a small performance hit.
- // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
- // and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> value) throws IOException
- {
- if (nextKeyValue())
- {
- value.clear();
- value.putAll(getCurrentValue());
-
- keys.clear();
- keys.putAll(getCurrentKey());
-
- return true;
- }
- return false;
- }
-
- public long getPos() throws IOException
- {
- return (long) rowIterator.totalRead;
- }
-
- public Map<String, ByteBuffer> createKey()
- {
- return new LinkedHashMap<String, ByteBuffer>();
- }
-
- public Map<String, ByteBuffer> createValue()
- {
- return new LinkedHashMap<String, ByteBuffer>();
- }
-
- /** CQL row iterator */
- private class RowIterator extends AbstractIterator<Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>>>
- {
- protected int totalRead = 0; // total number of cf rows read
- protected Iterator<CqlRow> rows;
- private int pageRows = 0; // the number of cql rows read of this page
- private String previousRowKey = null; // previous CF row key
- private String partitionKeyString; // keys in <key1>, <key2>, <key3> string format
- private String partitionKeyMarkers; // question marks in ? , ? , ? format which matches the number of keys
-
- public RowIterator()
- {
- // initial page
- executeQuery();
- }
-
- protected Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> computeNext()
- {
- if (rows == null)
- return endOfData();
-
- int index = -2;
- //check there are more page to read
- while (!rows.hasNext())
- {
- // no more data
- if (index == -1 || emptyPartitionKeyValues())
- {
- logger.debug("no more data.");
- return endOfData();
- }
-
- index = setTailNull(clusterColumns);
- logger.debug("set tail to null, index: " + index);
- executeQuery();
- pageRows = 0;
-
- if (rows == null || !rows.hasNext() && index < 0)
- {
- logger.debug("no more data.");
- return endOfData();
- }
- }
-
- Map<String, ByteBuffer> valueColumns = createValue();
- Map<String, ByteBuffer> keyColumns = createKey();
- int i = 0;
- CqlRow row = rows.next();
- for (Column column : row.columns)
- {
- String columnName = stringValue(ByteBuffer.wrap(column.getName()));
- logger.debug("column: " + columnName);
-
- if (i < partitionBoundColumns.size() + clusterColumns.size())
- keyColumns.put(stringValue(column.name), column.value);
- else
- valueColumns.put(stringValue(column.name), column.value);
-
- i++;
- }
-
- // increase total CQL row read for this page
- pageRows++;
-
- // increase total CF row read
- if (newRow(keyColumns, previousRowKey))
- totalRead++;
-
- // read full page
- if (pageRows >= pageRowSize || !rows.hasNext())
- {
- Iterator<String> newKeys = keyColumns.keySet().iterator();
- for (BoundColumn column : partitionBoundColumns)
- column.value = keyColumns.get(newKeys.next());
-
- for (BoundColumn column : clusterColumns)
- column.value = keyColumns.get(newKeys.next());
-
- executeQuery();
- pageRows = 0;
- }
-
- return Pair.create(keyColumns, valueColumns);
- }
-
- /** check whether start to read a new CF row by comparing the partition keys */
- private boolean newRow(Map<String, ByteBuffer> keyColumns, String previousRowKey)
- {
- if (keyColumns.isEmpty())
- return false;
-
- String rowKey = "";
- if (keyColumns.size() == 1)
- {
- rowKey = partitionBoundColumns.get(0).validator.getString(keyColumns.get(partitionBoundColumns.get(0).name));
- }
- else
- {
- Iterator<ByteBuffer> iter = keyColumns.values().iterator();
- for (BoundColumn column : partitionBoundColumns)
- rowKey = rowKey + column.validator.getString(ByteBufferUtil.clone(iter.next())) + ":";
- }
-
- logger.debug("previous RowKey: " + previousRowKey + ", new row key: " + rowKey);
- if (previousRowKey == null)
- {
- this.previousRowKey = rowKey;
- return true;
- }
-
- if (rowKey.equals(previousRowKey))
- return false;
-
- this.previousRowKey = rowKey;
- return true;
- }
-
- /** set the last non-null key value to null, and return the previous index */
- private int setTailNull(List<BoundColumn> values)
- {
- if (values.isEmpty())
- return -1;
-
- Iterator<BoundColumn> iterator = values.iterator();
- int previousIndex = -1;
- BoundColumn current;
- while (iterator.hasNext())
- {
- current = iterator.next();
- if (current.value == null)
- {
- int index = previousIndex > 0 ? previousIndex : 0;
- BoundColumn column = values.get(index);
- logger.debug("set key " + column.name + " value to null");
- column.value = null;
- return previousIndex - 1;
- }
-
- previousIndex++;
- }
-
- BoundColumn column = values.get(previousIndex);
- logger.debug("set key " + column.name + " value to null");
- column.value = null;
- return previousIndex - 1;
- }
-
- /** compose the prepared query, pair.left is query id, pair.right is query */
- private Pair<Integer, String> composeQuery(String columns)
- {
- Pair<Integer, String> clause = whereClause();
- if (columns == null)
- {
- columns = "*";
- }
- else
- {
- // add keys in the front in order
- String partitionKey = keyString(partitionBoundColumns);
- String clusterKey = keyString(clusterColumns);
-
- columns = withoutKeyColumns(columns);
- columns = (clusterKey == null || "".equals(clusterKey))
- ? partitionKey + "," + columns
- : partitionKey + "," + clusterKey + "," + columns;
- }
-
- return Pair.create(clause.left,
- "SELECT " + columns
- + " FROM " + cfName
- + clause.right
- + (userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses)
- + " LIMIT " + pageRowSize
- + " ALLOW FILTERING");
- }
-
-
- /** remove key columns from the column string */
- private String withoutKeyColumns(String columnString)
- {
- Set<String> keyNames = new HashSet<String>();
- for (BoundColumn column : Iterables.concat(partitionBoundColumns, clusterColumns))
- keyNames.add(column.name);
-
- String[] columns = columnString.split(",");
- String result = null;
- for (String column : columns)
- {
- String trimmed = column.trim();
- if (keyNames.contains(trimmed))
- continue;
-
- result = result == null ? trimmed : result + "," + trimmed;
- }
- return result;
- }
-
- /** compose the where clause */
- private Pair<Integer, String> whereClause()
- {
- if (partitionKeyString == null)
- partitionKeyString = keyString(partitionBoundColumns);
-
- if (partitionKeyMarkers == null)
- partitionKeyMarkers = partitionKeyMarkers();
- // initial query token(k) >= start_token and token(k) <= end_token
- if (emptyPartitionKeyValues())
- return Pair.create(0, " WHERE token(" + partitionKeyString + ") > ? AND token(" + partitionKeyString + ") <= ?");
-
- // query token(k) > token(pre_partition_key) and token(k) <= end_token
- if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
- return Pair.create(1,
- " WHERE token(" + partitionKeyString + ") > token(" + partitionKeyMarkers + ") "
- + " AND token(" + partitionKeyString + ") <= ?");
-
- // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
- Pair<Integer, String> clause = whereClause(clusterColumns, 0);
- return Pair.create(clause.left,
- " WHERE token(" + partitionKeyString + ") = token(" + partitionKeyMarkers + ") " + clause.right);
- }
-
- /** recursively compose the where clause */
- private Pair<Integer, String> whereClause(List<BoundColumn> column, int position)
- {
- if (position == column.size() - 1 || column.get(position + 1).value == null)
- return Pair.create(position + 2, " AND " + column.get(position).name + " > ? ");
-
- Pair<Integer, String> clause = whereClause(column, position + 1);
- return Pair.create(clause.left, " AND " + column.get(position).name + " = ? " + clause.right);
- }
-
- /** check whether all key values are null */
- private boolean emptyPartitionKeyValues()
- {
- for (BoundColumn column : partitionBoundColumns)
- {
- if (column.value != null)
- return false;
- }
- return true;
- }
-
- /** compose the partition key string in format of <key1>, <key2>, <key3> */
- private String keyString(List<BoundColumn> columns)
- {
- String result = null;
- for (BoundColumn column : columns)
- result = result == null ? column.name : result + "," + column.name;
-
- return result == null ? "" : result;
- }
-
- /** compose the question marks for partition key string in format of ?, ? , ? */
- private String partitionKeyMarkers()
- {
- String result = null;
- for (BoundColumn column : partitionBoundColumns)
- result = result == null ? "?" : result + ",?";
-
- return result;
- }
-
- /** compose the query binding variables, pair.left is query id, pair.right is the binding variables */
- private Pair<Integer, List<ByteBuffer>> preparedQueryBindValues()
- {
- List<ByteBuffer> values = new LinkedList<ByteBuffer>();
-
- // initial query token(k) >= start_token and token(k) <= end_token
- if (emptyPartitionKeyValues())
- {
- values.add(partitioner.getTokenValidator().fromString(split.getStartToken()));
- values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
- return Pair.create(0, values);
- }
- else
- {
- for (BoundColumn partitionBoundColumn1 : partitionBoundColumns)
- values.add(partitionBoundColumn1.value);
-
- if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
- {
- // query token(k) > token(pre_partition_key) and token(k) <= end_token
- values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
- return Pair.create(1, values);
- }
- else
- {
- // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
- int type = preparedQueryBindValues(clusterColumns, 0, values);
- return Pair.create(type, values);
- }
- }
- }
-
- /** recursively compose the query binding variables */
- private int preparedQueryBindValues(List<BoundColumn> column, int position, List<ByteBuffer> bindValues)
- {
- if (position == column.size() - 1 || column.get(position + 1).value == null)
- {
- bindValues.add(column.get(position).value);
- return position + 2;
- }
- else
- {
- bindValues.add(column.get(position).value);
- return preparedQueryBindValues(column, position + 1, bindValues);
- }
- }
-
- /** get the prepared query item Id */
- private int prepareQuery(int type) throws InvalidRequestException, TException
- {
- Integer itemId = preparedQueryIds.get(type);
- if (itemId != null)
- return itemId;
-
- Pair<Integer, String> query = null;
- query = composeQuery(columns);
- logger.debug("type:" + query.left + ", query: " + query.right);
- CqlPreparedResult cqlPreparedResult = client.prepare_cql3_query(ByteBufferUtil.bytes(query.right), Compression.NONE);
- preparedQueryIds.put(query.left, cqlPreparedResult.itemId);
- return cqlPreparedResult.itemId;
- }
-
- /** execute the prepared query */
- private void executeQuery()
- {
- Pair<Integer, List<ByteBuffer>> bindValues = preparedQueryBindValues();
- logger.debug("query type: " + bindValues.left);
-
- // check whether it reach end of range for type 1 query CASSANDRA-5573
- if (bindValues.left == 1 && reachEndRange())
- {
- rows = null;
- return;
- }
-
- int retries = 0;
- // only try three times for TimedOutException and UnavailableException
- while (retries < 3)
- {
- try
- {
- CqlResult cqlResult = client.execute_prepared_cql3_query(prepareQuery(bindValues.left), bindValues.right, consistencyLevel);
- if (cqlResult != null && cqlResult.rows != null)
- rows = cqlResult.rows.iterator();
- return;
- }
- catch (TimedOutException e)
- {
- retries++;
- if (retries >= 3)
- {
- rows = null;
- RuntimeException rte = new RuntimeException(e.getMessage());
- rte.initCause(e);
- throw rte;
- }
- }
- catch (UnavailableException e)
- {
- retries++;
- if (retries >= 3)
- {
- rows = null;
- RuntimeException rte = new RuntimeException(e.getMessage());
- rte.initCause(e);
- throw rte;
- }
- }
- catch (Exception e)
- {
- rows = null;
- RuntimeException rte = new RuntimeException(e.getMessage());
- rte.initCause(e);
- throw rte;
- }
- }
- }
- }
-
- /** retrieve the partition keys and cluster keys from system.schema_columnfamilies table */
- private void retrieveKeys() throws Exception
- {
- String query = "select key_aliases," +
- "column_aliases, " +
- "key_validator, " +
- "comparator " +
- "from system.schema_columnfamilies " +
- "where keyspace_name='%s' and columnfamily_name='%s'";
- String formatted = String.format(query, keyspace, cfName);
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
-
- CqlRow cqlRow = result.rows.get(0);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
- logger.debug("partition keys: " + keyString);
- List<String> keys = FBUtilities.fromJsonList(keyString);
-
- for (String key : keys)
- partitionBoundColumns.add(new BoundColumn(key));
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
- logger.debug("cluster columns: " + keyString);
- keys = FBUtilities.fromJsonList(keyString);
-
- for (String key : keys)
- clusterColumns.add(new BoundColumn(key));
-
- Column rawKeyValidator = cqlRow.columns.get(2);
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
- logger.debug("row key validator: " + validator);
- keyValidator = parseType(validator);
-
- if (keyValidator instanceof CompositeType)
- {
- List<AbstractType<?>> types = ((CompositeType) keyValidator).types;
- for (int i = 0; i < partitionBoundColumns.size(); i++)
- partitionBoundColumns.get(i).validator = types.get(i);
- }
- else
- {
- partitionBoundColumns.get(0).validator = keyValidator;
- }
- }
-
- /** check whether current row is at the end of range */
- private boolean reachEndRange()
- {
- // current row key
- ByteBuffer rowKey;
- if (keyValidator instanceof CompositeType)
- {
- ByteBuffer[] keys = new ByteBuffer[partitionBoundColumns.size()];
- for (int i = 0; i < partitionBoundColumns.size(); i++)
- keys[i] = partitionBoundColumns.get(i).value.duplicate();
-
- rowKey = ((CompositeType) keyValidator).build(keys);
- }
- else
- {
- rowKey = partitionBoundColumns.get(0).value;
- }
-
- String endToken = split.getEndToken();
- String currentToken = partitioner.getToken(rowKey).toString();
- logger.debug("End token: " + endToken + ", current token: " + currentToken);
-
- return endToken.equals(currentToken);
- }
-
- private static AbstractType<?> parseType(String type) throws IOException
- {
- try
- {
- // always treat counters like longs, specifically CCT.compose is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- private class BoundColumn
- {
- final String name;
- ByteBuffer value;
- AbstractType<?> validator;
-
- public BoundColumn(String name)
- {
- this.name = name;
- }
- }
-
- /** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/
- private static String stringValue(ByteBuffer value)
- {
- try
- {
- return ByteBufferUtil.string(value);
- }
- catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
deleted file mode 100644
index c32d9ef..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.Progressable;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it applies the binded variables
- * in the value to the prepared statement, which it associates with the key, and in
- * turn the responsible endpoint.
- *
- * <p>
- * Furthermore, this writer groups the cql queries by the endpoint responsible for
- * the rows being affected. This allows the cql queries to be executed in parallel,
- * directly to a responsible endpoint.
- * </p>
- *
- * @see ColumnFamilyOutputFormat
- */
-final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>
-{
- private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordWriter.class);
-
- // handles for clients for each range running in the threadpool
- private final Map<Range, RangeClient> clients;
-
- // host to prepared statement id mappings
- private ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
-
- private final String cql;
-
- private AbstractType<?> keyValidator;
- private String [] partitionkeys;
-
- /**
- * Upon construction, obtain the map that this writer will use to collect
- * mutations, and the ring cache for the given keyspace.
- *
- * @param context the task attempt context
- * @throws IOException
- */
- ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
- {
- this(context.getConfiguration());
- this.progressable = new Progressable(context);
- }
-
- ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) throws IOException
- {
- this(conf);
- this.progressable = progressable;
- }
-
- ColumnFamilyRecordWriter(Configuration conf) throws IOException
- {
- super(conf);
- this.clients = new HashMap<Range, RangeClient>();
- cql = CQLConfigHelper.getOutputCql(conf);
-
- try
- {
- String host = getAnyHost();
- int port = ConfigHelper.getOutputRpcPort(conf);
- Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
- retrievePartitionKeyValidator(client);
-
- if (client != null)
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- client = null;
- }
- }
- catch (Exception e)
- {
- throw new IOException(e);
- }
- }
-
- @Override
- public void close() throws IOException
- {
- // close all the clients before throwing anything
- IOException clientException = null;
- for (RangeClient client : clients.values())
- {
- try
- {
- client.close();
- }
- catch (IOException e)
- {
- clientException = e;
- }
- }
-
- if (clientException != null)
- throw clientException;
- }
-
- /**
- * If the key is to be associated with a valid value, a mutation is created
- * for it with the given column family and columns. In the event the value
- * in the column is missing (i.e., null), then it is marked for
- * {@link Deletion}. Similarly, if the entire value for a key is missing
- * (i.e., null), then the entire key is marked for {@link Deletion}.
- * </p>
- *
- * @param keyColumns
- * the key to write.
- * @param values
- * the values to write.
- * @throws IOException
- */
- @Override
- public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
- {
- ByteBuffer rowKey = getRowKey(keyColumns);
- Range<Token> range = ringCache.getRange(rowKey);
-
- // get the client for the given range, or create a new one
- RangeClient client = clients.get(range);
- if (client == null)
- {
- // haven't seen keys for this range: create new client
- client = new RangeClient(ringCache.getEndpoint(range));
- client.start();
- clients.put(range, client);
- }
-
- client.put(Pair.create(rowKey, values));
- progressable.progress();
- }
-
- /**
- * A client that runs in a threadpool and connects to the list of endpoints for a particular
- * range. Bound variables for keys in that range are sent to this client via a queue.
- */
- public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
- {
- /**
- * Constructs an {@link RangeClient} for the given endpoints.
- * @param endpoints the possible endpoints to execute the mutations on
- */
- public RangeClient(List<InetAddress> endpoints)
- {
- super(endpoints);
- }
-
- /**
- * Loops collecting cql binded variable values from the queue and sending to Cassandra
- */
- public void run()
- {
- outer:
- while (run || !queue.isEmpty())
- {
- Pair<ByteBuffer, List<ByteBuffer>> item;
- try
- {
- item = queue.take();
- }
- catch (InterruptedException e)
- {
- // re-check loop condition after interrupt
- continue;
- }
-
- Iterator<InetAddress> iter = endpoints.iterator();
- while (true)
- {
- // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
- try
- {
- int i = 0;
- int itemId = preparedStatement(client);
- while (item != null)
- {
- List<ByteBuffer> bindVariables = item.right;
- client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
- i++;
-
- if (i >= batchThreshold)
- break;
-
- item = queue.poll();
- }
-
- break;
- }
- catch (Exception e)
- {
- closeInternal();
- if (!iter.hasNext())
- {
- lastException = new IOException(e);
- break outer;
- }
- }
-
- // attempt to connect to a different endpoint
- try
- {
- InetAddress address = iter.next();
- String host = address.getHostName();
- int port = ConfigHelper.getOutputRpcPort(conf);
- client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
- }
- catch (Exception e)
- {
- closeInternal();
- // TException means something unexpected went wrong to that endpoint, so
- // we should try again to another. Other exceptions (auth or invalid request) are fatal.
- if ((!(e instanceof TException)) || !iter.hasNext())
- {
- lastException = new IOException(e);
- break outer;
- }
- }
- }
- }
- }
-
- /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
- private int preparedStatement(Cassandra.Client client)
- {
- Integer itemId = preparedStatements.get(client);
- if (itemId == null)
- {
- CqlPreparedResult result;
- try
- {
- result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException("failed to prepare cql query " + cql, e);
- }
- catch (TException e)
- {
- throw new RuntimeException("failed to prepare cql query " + cql, e);
- }
-
- Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
- itemId = previousId == null ? result.itemId : previousId;
- }
- return itemId;
- }
- }
-
- private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
- {
- //current row key
- ByteBuffer rowKey;
- if (keyValidator instanceof CompositeType)
- {
- ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
- for (int i = 0; i< keys.length; i++)
- keys[i] = keyColumns.get(partitionkeys[i]);
-
- rowKey = ((CompositeType) keyValidator).build(keys);
- }
- else
- {
- rowKey = keyColumns.get(partitionkeys[0]);
- }
- return rowKey;
- }
-
- /** retrieve the key validator from system.schema_columnfamilies table */
- private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
- {
- String keyspace = ConfigHelper.getOutputKeyspace(conf);
- String cfName = ConfigHelper.getOutputColumnFamily(conf);
- String query = "SELECT key_validator," +
- " key_aliases " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name='%s' and columnfamily_name='%s'";
- String formatted = String.format(query, keyspace, cfName);
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
-
- Column rawKeyValidator = result.rows.get(0).columns.get(0);
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
- keyValidator = parseType(validator);
-
- Column rawPartitionKeys = result.rows.get(0).columns.get(1);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
- logger.debug("partition keys: " + keyString);
-
- List<String> keys = FBUtilities.fromJsonList(keyString);
- partitionkeys = new String[keys.size()];
- int i = 0;
- for (String key : keys)
- {
- partitionkeys[i] = key;
- i++;
- }
- }
-
- private AbstractType<?> parseType(String type) throws IOException
- {
- try
- {
- // always treat counters like longs, specifically CCT.compose is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- private String getAnyHost() throws IOException, InvalidRequestException, TException
- {
- Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
- List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
- try
- {
- for (TokenRange range : ring)
- return range.endpoints.get(0);
- }
- finally
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- }
- throw new IOException("There are no endpoints");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
new file mode 100644
index 0000000..d1e93c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.Progressable;
+import org.apache.hadoop.mapreduce.*;
+
+/**
+ * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ * binded variable values) as CQL rows (and respective columns) in a given
+ * ColumnFamily.
+ *
+ * <p>
+ * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
+ * prepared statement in your
+ * Hadoop job Configuration. The {@link CqlConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputPreparedStatement} method, is provided to make this
+ * simple.
+ * you need to set the Keyspace. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
+ * simple.
+ * </p>
+ *
+ * <p>
+ * For the sake of performance, this class employs a lazy write-back caching
+ * mechanism, where its record writer prepared statement binded variable values
+ * created based on the reduce's inputs (in a task-specific map), and periodically
+ * makes the changes official by sending a execution of prepared statement request
+ * to Cassandra.
+ * </p>
+ */
+public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+{
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
+ {
+ return new CqlRecordWriter(job, new Progressable(progress));
+ }
+
+ /**
+ * Get the {@link RecordWriter} for the given task.
+ *
+ * @param context
+ * the information about the current task.
+ * @return a {@link RecordWriter} to write the output for the job.
+ * @throws IOException
+ */
+ public CqlRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new CqlRecordWriter(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0047797/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
new file mode 100644
index 0000000..0e1509e
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
+ *
+ * At minimum, you need to set the KS and CF in your Hadoop job Configuration.
+ * The ConfigHelper class is provided to make this
+ * simple:
+ * ConfigHelper.setInputColumnFamily
+ *
+ * You can also configure the number of rows per InputSplit with
+ * ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ * the number of CQL rows per page
+ *
+ * the number of CQL rows per page
+ * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
+ * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
+ * query, so you need set it big enough to minimize the network overhead, and also
+ * not too big to avoid out of memory issue.
+ *
+ * the column names of the select CQL query. The default is all columns
+ * CQLConfigHelper.setInputColumns
+ *
+ * the user defined the where clause
+ * CQLConfigHelper.setInputWhereClauses. The default is no user defined where clause
+ */
+public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
+{
+ public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
+ throws IOException
+ {
+ TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
+ {
+ @Override
+ public void progress()
+ {
+ reporter.progress();
+ }
+ };
+
+ CqlPagingRecordReader recordReader = new CqlPagingRecordReader();
+ recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
+ return recordReader;
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> createRecordReader(
+ org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+ InterruptedException
+ {
+ return new CqlPagingRecordReader();
+ }
+
+}