You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/01/13 18:28:17 UTC
[2/3] cassandra git commit: Pig: Refactor and deprecate CqlStorage
Pig: Refactor and deprecate CqlStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-8599
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/359d3bb2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/359d3bb2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/359d3bb2
Branch: refs/heads/trunk
Commit: 359d3bb2a69b856e6dca8a060f6087307808cb5e
Parents: 1cb426b
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jan 13 11:24:45 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jan 13 11:24:45 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/pig/README.txt | 27 +-
examples/pig/example-script-cql.pig | 6 +-
examples/pig/test/test_cql_storage.pig | 12 +-
.../hadoop/pig/AbstractCassandraStorage.java | 2 +-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 520 +++++++++++++-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 693 +------------------
7 files changed, 542 insertions(+), 719 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2e25c4..e070eaf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599)
* Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
* Fix case-sensitivity of index name on CREATE and DROP INDEX
statements (CASSANDRA-8365)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index 2ae9824..1553a9f 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -35,7 +35,7 @@ for input and output:
CassandraStorage
================
-The CassandraStorage class is for any non-CQL3 ColumnFamilies you may have. For CQL3 support, refer to the CqlStorage section.
+The CassandraStorage class is for any non-CQL3 ColumnFamilies you may have. For CQL3 support, refer to the CqlNativeStorage section.
examples/pig$ bin/pig_cassandra -x local example-script.pig
@@ -95,15 +95,24 @@ PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling
the amount of mapper tasks created. This can also be set in the LOAD url by
adding the 'split_size=X' parameter, where X is an integer amount for the size.
-CqlStorage
-==========
-
-The CqlStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies. The main difference is in the URL format:
-
-cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&use_secondary=true|false][&partitioner=<partitioner>]]
+CqlNativeStorage
+================
+The CqlNativeStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies. The main difference is in the URL format:
+
+cql://[username:password@]<keyspace>/<columnfamily>
+ [?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]
+ [&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]
+ [&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]
+ [&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]
+ [&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]
+ [&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]
+ [&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]
+ [&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]
+ [&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]
+ [columns=<columns>][where_clause=<where_clause>]]
Which in grunt, the simplest example would look like:
-grunt> rows = LOAD 'cql://MyKeyspace/MyColumnFamily' USING CqlStorage();
+grunt> rows = LOAD 'cql://MyKeyspace/MyColumnFamily' USING CqlNativeStorage();
-CqlStorage handles wide rows automatically and thus has no separate flag for this.
+CqlNativeStorage handles wide rows automatically and thus has no separate flag for this.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/examples/pig/example-script-cql.pig
----------------------------------------------------------------------
diff --git a/examples/pig/example-script-cql.pig b/examples/pig/example-script-cql.pig
index 63656a7..ef11130 100644
--- a/examples/pig/example-script-cql.pig
+++ b/examples/pig/example-script-cql.pig
@@ -1,5 +1,5 @@
--- CqlStorage
-libdata = LOAD 'cql://libdata/libout' USING CqlStorage();
+-- CqlNativeStorage
+libdata = LOAD 'cql://libdata/libout' USING CqlNativeStorage();
book_by_mail = FILTER libdata BY C_OUT_TY == 'BM';
libdata_buildings = FILTER libdata BY SQ_FEET > 0;
@@ -8,4 +8,4 @@ state_grouped = GROUP state_flat BY State;
state_footage = FOREACH state_grouped GENERATE group AS State, SUM(state_flat.SquareFeet) AS TotalFeet:int;
insert_format= FOREACH state_footage GENERATE TOTUPLE(TOTUPLE('year',2011),TOTUPLE('state',State)),TOTUPLE(TotalFeet);
-STORE insert_format INTO 'cql://libdata/libsqft?output_query=UPDATE%20libdata.libsqft%20SET%20sqft%20%3D%20%3F' USING CqlStorage;
\ No newline at end of file
+STORE insert_format INTO 'cql://libdata/libsqft?output_query=UPDATE%20libdata.libsqft%20SET%20sqft%20%3D%20%3F' USING CqlNativeStorage;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/examples/pig/test/test_cql_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_cql_storage.pig b/examples/pig/test/test_cql_storage.pig
index 3383d4a..822748e 100644
--- a/examples/pig/test/test_cql_storage.pig
+++ b/examples/pig/test/test_cql_storage.pig
@@ -1,14 +1,14 @@
-moretestvalues= LOAD 'cql://cql3ks/moredata/' USING CqlStorage;
+moretestvalues= LOAD 'cql://cql3ks/moredata/' USING CqlNativeStorage;
insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);
-STORE insertformat INTO 'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage;
+STORE insertformat INTO 'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlNativeStorage;
-- composite key
-moredata = load 'cql://cql3ks/compmore' USING CqlStorage;
+moredata = load 'cql://cql3ks/compmore' USING CqlNativeStorage;
insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);
-STORE insertformat INTO 'cql://cql3ks/compotable?output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage;
+STORE insertformat INTO 'cql://cql3ks/compotable?output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlNativeStorage;
-- collection column
-collectiontable = LOAD 'cql://cql3ks/collectiontable/' USING CqlStorage;
+collectiontable = LOAD 'cql://cql3ks/collectiontable/' USING CqlNativeStorage;
-- recs= (((m,kk)),((map,(m,mm),(n,nn))))
recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));
-store recs INTO 'cql://cql3ks/collectiontable?output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();
+store recs INTO 'cql://cql3ks/collectiontable?output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlNativeStorage();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 035f99a..5884f29 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -683,7 +683,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
if (cassandraStorage)
return columnDefs;
- // otherwise for CqlStorage, check metadata for classic thrift tables
+ // otherwise for CqlNativeStorage, check metadata for classic thrift tables
CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
for (ColumnDefinition def : cfm.regularAndStaticColumns())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index f0bb8f9..3eb6823 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -19,30 +19,49 @@ package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.BufferCell;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.ColumnDef;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.Expression;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Row;
-public class CqlNativeStorage extends CqlStorage
+public class CqlNativeStorage extends AbstractCassandraStorage
{
+ private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
+ private int pageSize = 1000;
+ private String columns;
+ private String outputQuery;
+ private String whereClause;
+ private boolean hasCompactValueAlias = false;
+
private RecordReader<Long, Row> reader;
+ private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
private String nativePort;
private String nativeCoreConnections;
private String nativeMaxConnections;
@@ -72,8 +91,10 @@ public class CqlNativeStorage extends CqlStorage
/** @param pageSize limit number of CQL rows to fetch in a thrift request */
public CqlNativeStorage(int pageSize)
{
- super(pageSize);
+ super();
+ this.pageSize = pageSize;
DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
+ DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
}
public void prepareToRead(RecordReader reader, PigSplit split)
@@ -84,6 +105,11 @@ public class CqlNativeStorage extends CqlStorage
}
}
+ public void prepareToWrite(RecordWriter writer)
+ {
+ this.writer = writer;
+ }
+
/** get next row */
public Tuple getNext() throws IOException
{
@@ -121,6 +147,421 @@ public class CqlNativeStorage extends CqlStorage
}
}
+ /** convert a cql column to an object */
+ private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ ByteBuffer cellName = col.name().toByteBuffer();
+ if (validators.get(cellName) == null)
+ return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
+ else
+ return cassandraToObj(validators.get(cellName), col.value());
+ }
+
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).getElementsType();
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).getElementsType();
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).getKeysType();
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).getValuesType();
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
+ setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ private Object cassandraToPigData(Object obj, AbstractType validator)
+ {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ return validator.getString(validator.decompose(obj));
+ return obj;
+ }
+
+ /** include key columns */
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException,
+ NotFoundException
+ {
+ List<ColumnDef> keyColumns = null;
+ // get key columns
+ try
+ {
+ keyColumns = getKeysMeta(client);
+ }
+ catch(Exception e)
+ {
+ logger.error("Error in retrieving key columns" , e);
+ }
+
+ // get other columns
+ List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
+
+ // combine all columns in a list
+ if (keyColumns != null && columns != null)
+ keyColumns.addAll(columns);
+
+ return keyColumns;
+ }
+
+ /** get keys meta data */
+ private List<ColumnDef> getKeysMeta(Cassandra.Client client)
+ throws Exception
+ {
+ String query = "SELECT key_aliases, " +
+ " column_aliases, " +
+ " key_validator, " +
+ " comparator, " +
+ " keyspace_name, " +
+ " value_alias, " +
+ " default_validator " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s'" +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ List<ColumnDef> keys = new ArrayList<ColumnDef>();
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+ String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ logger.debug("Found ksDef name: {}", name);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+ logger.debug("partition keys: {}", keyString);
+ List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+ Iterator<String> iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+ // classic thrift tables
+ if (keys.size() == 0)
+ {
+ CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+ for (ColumnDefinition def : cfm.partitionKeyColumns())
+ {
+ String key = def.name.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ for (ColumnDefinition def : cfm.clusteringColumns())
+ {
+ String key = def.name.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ }
+
+ keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+ logger.debug("cluster keys: {}", keyString);
+ keyNames = FBUtilities.fromJsonList(keyString);
+
+ iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+ logger.debug("row key validator: {}", validator);
+ AbstractType<?> keyValidator = parseType(validator);
+
+ Iterator<ColumnDef> keyItera = keys.iterator();
+ if (keyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+ while (typeItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = keyValidator.toString();
+
+ validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+ logger.debug("cluster key validator: {}", validator);
+
+ if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+ {
+ AbstractType<?> clusterKeyValidator = parseType(validator);
+
+ if (clusterKeyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+ while (keyItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = clusterKeyValidator.toString();
+ }
+
+ // compact value_alias column
+ if (cqlRow.columns.get(5).value != null)
+ {
+ try
+ {
+ String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+ logger.debug("default validator: {}", compactValidator);
+ AbstractType<?> defaultValidator = parseType(compactValidator);
+
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = cqlRow.columns.get(5).value;
+ cDef.validation_class = defaultValidator.toString();
+ keys.add(cDef);
+ hasCompactValueAlias = true;
+ }
+ catch (Exception e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ // no compact column at value_alias
+ }
+ }
+
+ }
+ return keys;
+ }
+
+
+ /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
+ public void putNext(Tuple t) throws IOException
+ {
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+
+ if (t.getType(0) == DataType.TUPLE)
+ {
+ if (t.getType(1) == DataType.TUPLE)
+ {
+ Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+ cqlQueryFromTuple(key, t, 1);
+ }
+ else
+ throw new IOException("Second argument in output must be a tuple");
+ }
+ else
+ throw new IOException("First argument in output must be a tuple");
+ }
+
+ /** convert key tuple to key map */
+ private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
+ {
+ Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
+ for (int i = 0; i < t.size(); i++)
+ {
+ if (t.getType(i) == DataType.TUPLE)
+ {
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() == 2)
+ {
+ Object name = inner.get(0);
+ if (name != null)
+ {
+ keys.put(name.toString(), objToBB(inner.get(1)));
+ }
+ else
+ throw new IOException("Key name was empty");
+ }
+ else
+ throw new IOException("Keys were not in name and value pairs");
+ }
+ else
+ {
+ throw new IOException("keys was not a tuple");
+ }
+ }
+ return keys;
+ }
+
+ /** send CQL query request using data from tuple */
+ private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
+ {
+ for (int i = offset; i < t.size(); i++)
+ {
+ if (t.getType(i) == DataType.TUPLE)
+ {
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() > 0)
+ {
+ List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
+ if (bindedVariables.size() > 0)
+ sendCqlQuery(key, bindedVariables);
+ else
+ throw new IOException("Missing binded variables");
+ }
+ }
+ else
+ {
+ throw new IOException("Output type was not a tuple");
+ }
+ }
+ }
+
+ /** compose a list of binded variables */
+ private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
+ {
+ List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < t.size(); i++)
+ variables.add(objToBB(t.get(i)));
+ return variables;
+ }
+
+ /** writer write the data by executing CQL query */
+ private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
+ {
+ try
+ {
+ writer.write(key, bindedVariables);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /** schema: (value, value, value) where keys are in the front. */
+ public ResourceSchema getSchema(String location, Job job) throws IOException
+ {
+ setLocation(location, job);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
+ // top-level schema, no type
+ ResourceSchema schema = new ResourceSchema();
+
+ // get default marshallers and validators
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+
+ // will contain all fields for this schema
+ List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
+
+ for (ColumnDef cdef : cfDef.column_metadata)
+ {
+ ResourceFieldSchema valSchema = new ResourceFieldSchema();
+ AbstractType validator = validators.get(cdef.name);
+ if (validator == null)
+ validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+ valSchema.setName(new String(cdef.getName()));
+ valSchema.setType(getPigType(validator));
+ allSchemaFields.add(valSchema);
+ }
+
+ // top level schema contains everything
+ schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
+ return schema;
+ }
+
+ public void setPartitionFilter(Expression partitionFilter) throws IOException
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+ property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
+ }
+
+ /**
+ * Return cql where clauses for the corresponding partition filter. Make sure the data format matches
+ * Only support the following Pig data types: int, long, float, double, boolean and chararray
+ * */
+ private String partitionFilterToWhereClauseString(Expression expression) throws IOException
+ {
+ Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
+ OpType op = expression.getOpType();
+ String opString = op.toString();
+ switch (op)
+ {
+ case OP_EQ:
+ opString = " = ";
+ case OP_GE:
+ case OP_GT:
+ case OP_LE:
+ case OP_LT:
+ String name = be.getLhs().toString();
+ String value = be.getRhs().toString();
+ return String.format("%s %s %s", name, opString, value);
+ case OP_AND:
+ return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()), partitionFilterToWhereClauseString(be.getRhs()));
+ default:
+ throw new IOException("Unsupported expression type: " + opString);
+ }
+ }
+
+ /** retrieve where clause for partition filter */
+ private String getWhereClauseForPartitionFilter()
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+ return property.getProperty(PARTITION_FILTER_SIGNATURE);
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -190,6 +631,16 @@ public class CqlNativeStorage extends CqlStorage
if (whereClause != null)
CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+ String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
+ String wc = whereClause != null && !whereClause.trim().isEmpty()
+ ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
+ : whereClauseForPartitionFilter;
+
+ if (wc != null)
+ {
+ logger.debug("where clause: {}", wc);
+ CqlConfigHelper.setInputWhereClauses(conf, wc);
+ }
if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
{
try
@@ -212,6 +663,44 @@ public class CqlNativeStorage extends CqlStorage
initSchema(loadSignature);
}
+ /** set store configuration settings */
+ public void setStoreLocation(String location, Job job) throws IOException
+ {
+ conf = HadoopCompat.getConfiguration(job);
+ setLocationFromUri(location);
+
+ if (username != null && password != null)
+ ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setOutputPartitioner(conf, partitionerClass);
+ if (rpcPort != null)
+ {
+ ConfigHelper.setOutputRpcPort(conf, rpcPort);
+ ConfigHelper.setInputRpcPort(conf, rpcPort);
+ }
+ if (initHostAddress != null)
+ {
+ ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
+ ConfigHelper.setInputInitialAddress(conf, initHostAddress);
+ }
+
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+ CqlConfigHelper.setOutputCql(conf, outputQuery);
+
+ setConnectionInformation();
+
+ if (ConfigHelper.getOutputRpcPort(conf) == 0)
+ throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+ initSchema(storeSignature);
+ }
+
private void setLocationFromUri(String location) throws IOException
{
try
@@ -320,4 +809,11 @@ public class CqlNativeStorage extends CqlStorage
}
}
+ /**
+ * Thrift API can't handle null, so use empty byte array
+ */
+ public ByteBuffer nullToBB()
+ {
+ return ByteBuffer.wrap(new byte[0]);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 08926fa..c7277fa 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -17,708 +17,25 @@
*/
package org.apache.cassandra.hadoop.pig;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.Expression.OpType;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * A LoadStoreFunc for retrieving data from and storing data to Cassandra
- *
- * A row from a standard CF will be returned as nested tuples:
- * (((key1, value1), (key2, value2)), ((name1, val1), (name2, val2))).
+ * @deprecated use CqlNativeStorage instead. CqlStorage will be removed.
*/
-public class CqlStorage extends AbstractCassandraStorage
+public class CqlStorage extends CqlNativeStorage
{
- private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class);
- private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader;
- protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+ private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
- protected int pageSize = 1000;
- protected String columns;
- protected String outputQuery;
- protected String whereClause;
- private boolean hasCompactValueAlias = false;
-
public CqlStorage()
{
this(1000);
+ logger.warn("CqlStorage is deprecated and will be removed in the next release, use CqlNativeStorage instead.");
}
/** @param pageSize limit number of CQL rows to fetch in a thrift request */
public CqlStorage(int pageSize)
{
- super();
- this.pageSize = pageSize;
- DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
- DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
- }
-
- public void prepareToRead(RecordReader reader, PigSplit split)
- {
- this.reader = reader;
- }
-
- /** get next row */
- public Tuple getNext() throws IOException
- {
- try
- {
- // load the next pair
- if (!reader.nextKeyValue())
- return null;
-
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
- Map<String, ByteBuffer> keys = reader.getCurrentKey();
- Map<String, ByteBuffer> columns = reader.getCurrentValue();
- assert keys != null && columns != null;
-
- // add key columns to the map
- for (Map.Entry<String,ByteBuffer> key : keys.entrySet())
- columns.put(key.getKey(), key.getValue());
-
- Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
- Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
- int i = 0;
- while (itera.hasNext())
- {
- ColumnDef cdef = itera.next();
- ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
- if (columnValue != null)
- {
- Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
- AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
- setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
- }
- else
- tuple.set(i, null);
- i++;
- }
- return tuple;
- }
- catch (InterruptedException e)
- {
- throw new IOException(e.getMessage());
- }
- }
-
- /** set the value to the position of the tuple */
- protected void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
- {
- if (validator instanceof CollectionType)
- setCollectionTupleValues(tuple, position, value, validator);
- else
- setTupleValue(tuple, position, value);
- }
-
- /** set the values of set/list at and after the position of the tuple */
- private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
- {
- if (validator instanceof MapType)
- {
- setMapTupleValues(tuple, position, value, validator);
- return;
- }
- AbstractType elementValidator;
- if (validator instanceof SetType)
- elementValidator = ((SetType<?>) validator).getElementsType();
- else if (validator instanceof ListType)
- elementValidator = ((ListType<?>) validator).getElementsType();
- else
- return;
-
- int i = 0;
- Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
- for (Object entry : (Collection<?>) value)
- {
- setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
- i++;
- }
- tuple.set(position, innerTuple);
- }
-
- /** set the values of set/list at and after the position of the tuple */
- private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
- {
- AbstractType<?> keyValidator = ((MapType<?, ?>) validator).getKeysType();
- AbstractType<?> valueValidator = ((MapType<?, ?>) validator).getValuesType();
-
- int i = 0;
- Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
- for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
- {
- Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
- setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
- setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
- innerTuple.set(i, mapEntryTuple);
- i++;
- }
- tuple.set(position, innerTuple);
- }
-
- /** convert a cql column to an object */
- protected Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
- {
- // standard
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- ByteBuffer cellName = col.name().toByteBuffer();
- if (validators.get(cellName) == null)
- return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
- else
- return cassandraToObj(validators.get(cellName), col.value());
- }
-
- /** set read configuration settings */
- public void setLocation(String location, Job job) throws IOException
- {
- conf = HadoopCompat.getConfiguration(job);
- setLocationFromUri(location);
-
- if (username != null && password != null)
- ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
- if (splitSize > 0)
- ConfigHelper.setInputSplitSize(conf, splitSize);
- if (partitionerClass!= null)
- ConfigHelper.setInputPartitioner(conf, partitionerClass);
- if (rpcPort != null)
- ConfigHelper.setInputRpcPort(conf, rpcPort);
- if (initHostAddress != null)
- ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-
- ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
- setConnectionInformation();
-
- CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
- if (columns != null && !columns.trim().isEmpty())
- CqlConfigHelper.setInputColumns(conf, columns);
-
- String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
- String wc = whereClause != null && !whereClause.trim().isEmpty()
- ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
- : whereClauseForPartitionFilter;
-
- if (wc != null)
- {
- logger.debug("where clause: {}", wc);
- CqlConfigHelper.setInputWhereClauses(conf, wc);
- }
-
- if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
- {
- try
- {
- ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
- }
- catch (NumberFormatException e)
- {
- throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
- }
- }
-
- if (ConfigHelper.getInputRpcPort(conf) == 0)
- throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
- if (ConfigHelper.getInputInitialAddress(conf) == null)
- throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
- if (ConfigHelper.getInputPartitioner(conf) == null)
- throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
- if (loadSignature == null)
- loadSignature = location;
-
- initSchema(loadSignature);
- }
-
- /** set store configuration settings */
- public void setStoreLocation(String location, Job job) throws IOException
- {
- conf = HadoopCompat.getConfiguration(job);
- setLocationFromUri(location);
-
- if (username != null && password != null)
- ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
- if (splitSize > 0)
- ConfigHelper.setInputSplitSize(conf, splitSize);
- if (partitionerClass!= null)
- ConfigHelper.setOutputPartitioner(conf, partitionerClass);
- if (rpcPort != null)
- {
- ConfigHelper.setOutputRpcPort(conf, rpcPort);
- ConfigHelper.setInputRpcPort(conf, rpcPort);
- }
- if (initHostAddress != null)
- {
- ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
- ConfigHelper.setInputInitialAddress(conf, initHostAddress);
- }
-
- ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
- CqlConfigHelper.setOutputCql(conf, outputQuery);
-
- setConnectionInformation();
-
- if (ConfigHelper.getOutputRpcPort(conf) == 0)
- throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
- if (ConfigHelper.getOutputInitialAddress(conf) == null)
- throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
- if (ConfigHelper.getOutputPartitioner(conf) == null)
- throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-
- initSchema(storeSignature);
- }
-
- /** schema: (value, value, value) where keys are in the front. */
- public ResourceSchema getSchema(String location, Job job) throws IOException
- {
- setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
- // top-level schema, no type
- ResourceSchema schema = new ResourceSchema();
-
- // get default marshallers and validators
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
-
- // will contain all fields for this schema
- List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
-
- for (ColumnDef cdef : cfDef.column_metadata)
- {
- ResourceFieldSchema valSchema = new ResourceFieldSchema();
- AbstractType validator = validators.get(cdef.name);
- if (validator == null)
- validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
- valSchema.setName(new String(cdef.getName()));
- valSchema.setType(getPigType(validator));
- allSchemaFields.add(valSchema);
- }
-
- // top level schema contains everything
- schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
- return schema;
- }
-
- public void setPartitionFilter(Expression partitionFilter) throws IOException
- {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
- }
-
- /** retrieve where clause for partition filter */
- private String getWhereClauseForPartitionFilter()
- {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- return property.getProperty(PARTITION_FILTER_SIGNATURE);
- }
-
- public void prepareToWrite(RecordWriter writer)
- {
- this.writer = writer;
- }
-
- /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
- public void putNext(Tuple t) throws IOException
- {
- if (t.size() < 1)
- {
- // simply nothing here, we can't even delete without a key
- logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
- return;
- }
-
- if (t.getType(0) == DataType.TUPLE)
- {
- if (t.getType(1) == DataType.TUPLE)
- {
- Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
- cqlQueryFromTuple(key, t, 1);
- }
- else
- throw new IOException("Second argument in output must be a tuple");
- }
- else
- throw new IOException("First argument in output must be a tuple");
- }
-
- /** convert key tuple to key map */
- private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
- {
- Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
- for (int i = 0; i < t.size(); i++)
- {
- if (t.getType(i) == DataType.TUPLE)
- {
- Tuple inner = (Tuple) t.get(i);
- if (inner.size() == 2)
- {
- Object name = inner.get(0);
- if (name != null)
- {
- keys.put(name.toString(), objToBB(inner.get(1)));
- }
- else
- throw new IOException("Key name was empty");
- }
- else
- throw new IOException("Keys were not in name and value pairs");
- }
- else
- {
- throw new IOException("keys was not a tuple");
- }
- }
- return keys;
- }
-
- /** send CQL query request using data from tuple */
- private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
- {
- for (int i = offset; i < t.size(); i++)
- {
- if (t.getType(i) == DataType.TUPLE)
- {
- Tuple inner = (Tuple) t.get(i);
- if (inner.size() > 0)
- {
-
- List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
- if (bindedVariables.size() > 0)
- sendCqlQuery(key, bindedVariables);
- else
- throw new IOException("Missing binded variables");
- }
- }
- else
- {
- throw new IOException("Output type was not a tuple");
- }
- }
- }
-
- /** compose a list of binded variables */
- private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
- {
- List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
- for (int i = 0; i < t.size(); i++)
- variables.add(objToBB(t.get(i)));
- return variables;
- }
-
- /** writer write the data by executing CQL query */
- private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
- {
- try
- {
- writer.write(key, bindedVariables);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
-
- /** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
- {
- List<ColumnDef> keyColumns = null;
- // get key columns
- try
- {
- keyColumns = getKeysMeta(client);
- }
- catch(Exception e)
- {
- logger.error("Error in retrieving key columns" , e);
- }
-
- // get other columns
- List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
-
- // combine all columns in a list
- if (keyColumns != null && columns != null)
- keyColumns.addAll(columns);
-
- return keyColumns;
- }
-
- /** get keys meta data */
- protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
- {
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
- for (ColumnDefinition def : cfm.partitionKeyColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnDefinition def : cfm.clusteringColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- hasCompactValueAlias = true;
- }
- catch (Exception e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
- }
-
- /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
- * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
- * [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */
- private void setLocationFromUri(String location) throws IOException
- {
- try
- {
- if (!location.startsWith("cql://"))
- throw new Exception("Bad scheme: " + location);
-
- String[] urlParts = location.split("\\?");
- if (urlParts.length > 1)
- {
- Map<String, String> urlQuery = getQueryMap(urlParts[1]);
-
- // each page row size
- if (urlQuery.containsKey("page_size"))
- pageSize = Integer.parseInt(urlQuery.get("page_size"));
-
- // input query select columns
- if (urlQuery.containsKey("columns"))
- columns = urlQuery.get("columns");
-
- // output prepared statement
- if (urlQuery.containsKey("output_query"))
- outputQuery = urlQuery.get("output_query");
-
- // user defined where clause
- if (urlQuery.containsKey("where_clause"))
- whereClause = urlQuery.get("where_clause");
-
- //split size
- if (urlQuery.containsKey("split_size"))
- splitSize = Integer.parseInt(urlQuery.get("split_size"));
- if (urlQuery.containsKey("partitioner"))
- partitionerClass = urlQuery.get("partitioner");
- if (urlQuery.containsKey("use_secondary"))
- usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
- if (urlQuery.containsKey("init_address"))
- initHostAddress = urlQuery.get("init_address");
- if (urlQuery.containsKey("rpc_port"))
- rpcPort = urlQuery.get("rpc_port");
- }
- String[] parts = urlParts[0].split("/+");
- String[] credentialsAndKeyspace = parts[1].split("@");
- if (credentialsAndKeyspace.length > 1)
- {
- String[] credentials = credentialsAndKeyspace[0].split(":");
- username = credentials[0];
- password = credentials[1];
- keyspace = credentialsAndKeyspace[1];
- }
- else
- {
- keyspace = parts[1];
- }
- column_family = parts[2];
- }
- catch (Exception e)
- {
- throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
- "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
- "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" +
- "[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
- }
- }
-
- /**
- * Return cql where clauses for the corresponding partition filter. Make sure the data format matches
- * Only support the following Pig data types: int, long, float, double, boolean and chararray
- * */
- private String partitionFilterToWhereClauseString(Expression expression) throws IOException
- {
- Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
- OpType op = expression.getOpType();
- String opString = op.toString();
- switch (op)
- {
- case OP_EQ:
- opString = " = ";
- case OP_GE:
- case OP_GT:
- case OP_LE:
- case OP_LT:
- String name = be.getLhs().toString();
- String value = be.getRhs().toString();
- return String.format("%s %s %s", name, opString, value);
- case OP_AND:
- return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()), partitionFilterToWhereClauseString(be.getRhs()));
- default:
- throw new IOException("Unsupported expression type: " + opString);
- }
- }
-
- private Object cassandraToPigData(Object obj, AbstractType validator)
- {
- if (validator instanceof DecimalType || validator instanceof InetAddressType)
- return validator.getString(validator.decompose(obj));
- return obj;
- }
-
- /**
- * Thrift API can't handle null, so use empty byte array
- */
- public ByteBuffer nullToBB()
- {
- return ByteBuffer.wrap(new byte[0]);
+ super(pageSize);
}
}