You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/06/26 23:56:36 UTC
[1/2] Pig: support for cq3 tables Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5234
Updated Branches:
refs/heads/cassandra-1.2 54266ea70 -> 33a3d2ca5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
new file mode 100644
index 0000000..004b319
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.Expression;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Cassandra
+ *
+ * A row from a standard CF will be returned as nested tuples:
+ * (((key1, value1), (key2, value2)), ((name1, val1), (name2, val2))).
+ */
+public class CqlStorage extends AbstractCassandraStorage
+{
+ private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class);
+
+ private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader;
+ private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+
+ private int pageSize = 1000;
+ private String columns;
+ private String outputQuery;
+ private String whereClause;
+
+ public CqlStorage()
+ {
+ this(1000);
+ }
+
+ /** @param limit number of CQL rows to fetch in a thrift request */
+ public CqlStorage(int pageSize)
+ {
+ super();
+ this.pageSize = pageSize;
+ DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat";
+ DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
+ }
+
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ {
+ this.reader = reader;
+ }
+
+ /** get next row */
+ public Tuple getNext() throws IOException
+ {
+ try
+ {
+ // load the next pair
+ if (!reader.nextKeyValue())
+ return null;
+
+ CfDef cfDef = getCfDef(loadSignature);
+ Map<String, ByteBuffer> keys = reader.getCurrentKey();
+ Map<String, ByteBuffer> columns = reader.getCurrentValue();
+ assert keys != null && columns != null;
+
+ // add key columns to the map
+ for (Map.Entry<String,ByteBuffer> key : keys.entrySet())
+ columns.put(key.getKey(), key.getValue());
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
+ Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+ int i = 0;
+ while (itera.hasNext())
+ {
+ ColumnDef cdef = itera.next();
+ ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
+ if (columnValue != null)
+ {
+ IColumn column = new Column(cdef.name, columnValue);
+ tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ }
+ else
+ tuple.set(i, TupleFactory.getInstance().newTuple());
+ i++;
+ }
+ return tuple;
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /** set read configuration settings */
+ public void setLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
+
+ if (username != null && password != null)
+ ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setInputPartitioner(conf, partitionerClass);
+
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+ setConnectionInformation();
+
+ CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
+ if (columns != null && !columns.trim().isEmpty())
+ CqlConfigHelper.setInputColumns(conf, columns);
+ if (whereClause != null && !whereClause.trim().isEmpty())
+ CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
+ if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ {
+ try
+ {
+ ConfigHelper.setInputSplitSize(conf, Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a number", e);
+ }
+ }
+
+ if (ConfigHelper.getInputRpcPort(conf) == 0)
+ throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getInputInitialAddress(conf) == null)
+ throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getInputPartitioner(conf) == null)
+ throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+ if (loadSignature == null)
+ loadSignature = location;
+
+ initSchema(loadSignature);
+ }
+
+ /** set store configuration settings */
+ public void setStoreLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
+
+ if (username != null && password != null)
+ ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setOutputPartitioner(conf, partitionerClass);
+
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+ CqlConfigHelper.setOutputCql(conf, outputQuery);
+
+ setConnectionInformation();
+
+ if (ConfigHelper.getOutputRpcPort(conf) == 0)
+ throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+ initSchema(storeSignature);
+ }
+
+ /** schema: ((name, value), (name, value), (name, value)) where keys are in the front. */
+ public ResourceSchema getSchema(String location, Job job) throws IOException
+ {
+ setLocation(location, job);
+ CfDef cfDef = getCfDef(loadSignature);
+
+ // top-level schema, no type
+ ResourceSchema schema = new ResourceSchema();
+
+ // get default marshallers and validators
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+
+ // will contain all fields for this schema
+ List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
+
+ // defined validators/indexes
+ for (ColumnDef cdef : cfDef.column_metadata)
+ {
+ // make a new tuple for each col/val pair
+ ResourceSchema innerTupleSchema = new ResourceSchema();
+ ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
+ innerTupleField.setType(DataType.TUPLE);
+ innerTupleField.setSchema(innerTupleSchema);
+ innerTupleField.setName(new String(cdef.getName()));
+ ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
+ idxColSchema.setName("name");
+ idxColSchema.setType(getPigType(UTF8Type.instance));
+
+ ResourceFieldSchema valSchema = new ResourceFieldSchema();
+ AbstractType validator = validators.get(cdef.name);
+ if (validator == null)
+ validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+ valSchema.setName("value");
+ valSchema.setType(getPigType(validator));
+
+ innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
+ allSchemaFields.add(innerTupleField);
+ }
+
+ // top level schema contains everything
+ schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
+ return schema;
+ }
+
+
+ /** We use CQL3 where clause to define the partition, so do nothing here*/
+ public String[] getPartitionKeys(String location, Job job)
+ {
+ return null;
+ }
+
+ /** We use CQL3 where clause to define the partition, so do nothing here*/
+ public void setPartitionFilter(Expression partitionFilter)
+ {
+ }
+
+ public void prepareToWrite(RecordWriter writer)
+ {
+ this.writer = writer;
+ }
+
+ /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
+ public void putNext(Tuple t) throws IOException
+ {
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+
+ if (t.getType(0) == DataType.TUPLE)
+ {
+ Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+ if (t.getType(1) == DataType.TUPLE)
+ cqlQueryFromTuple(key, t, 1);
+ else
+ throw new IOException("Second argument in output must be a tuple");
+ }
+ else
+ throw new IOException("First argument in output must be a tuple");
+ }
+
+ /** convert key tuple to key map */
+ private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
+ {
+ Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
+ for (int i = 0; i < t.size(); i++)
+ {
+ if (t.getType(i) == DataType.TUPLE)
+ {
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() == 2)
+ {
+ Object name = inner.get(0);
+ if (name != null)
+ {
+ keys.put(name.toString(), objToBB(inner.get(1)));
+ }
+ else
+ throw new IOException("Key name was empty");
+ }
+ else
+ throw new IOException("Keys were not in name and value pairs");
+ }
+ else
+ {
+ throw new IOException("keys was not a tuple");
+ }
+ }
+ return keys;
+ }
+
+ /** send CQL query request using data from tuple */
+ private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
+ {
+ for (int i = offset; i < t.size(); i++)
+ {
+ if (t.getType(i) == DataType.TUPLE)
+ {
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() > 0)
+ {
+
+ List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
+ if (bindedVariables.size() > 0)
+ sendCqlQuery(key, bindedVariables);
+ else
+ throw new IOException("Missing binded variables");
+ }
+ }
+ else
+ {
+ throw new IOException("Output type was not a tuple");
+ }
+ }
+ }
+
+ /** compose a list of binded variables */
+ private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
+ {
+ List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < t.size(); i++)
+ variables.add(objToBB(t.get(i)));
+ return variables;
+ }
+
+ /** writer write the data by executing CQL query */
+ private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
+ {
+ try
+ {
+ writer.write(key, bindedVariables);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /** include key columns */
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException
+ {
+ List<ColumnDef> keyColumns = null;
+ // get key columns
+ try
+ {
+ keyColumns = getKeysMeta(client);
+ }
+ catch(IOException e)
+ {
+ logger.error("Error in retrieving key columns" , e);
+ }
+
+ // get other columns
+ List<ColumnDef> columns = getColumnMeta(client);
+
+ // combine all columns in a list
+ if (keyColumns != null && columns != null)
+ keyColumns.addAll(columns);
+
+ return keyColumns;
+ }
+
+ /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
+ * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
+ * [&split_size=<size>][&partitioner=<partitioner>]] */
+ private void setLocationFromUri(String location) throws IOException
+ {
+ try
+ {
+ if (!location.startsWith("cql://"))
+ throw new Exception("Bad scheme: " + location);
+
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+
+ // each page row size
+ if (urlQuery.containsKey("page_size"))
+ pageSize = Integer.parseInt(urlQuery.get("page_size"));
+
+ // input query select columns
+ if (urlQuery.containsKey("columns"))
+ columns = urlQuery.get("columns");
+
+ // output prepared statement
+ if (urlQuery.containsKey("output_query"))
+ outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+
+ // user defined where clause
+ if (urlQuery.containsKey("where_clause"))
+ whereClause = urlQuery.get("where_clause");
+
+ //split size
+ if (urlQuery.containsKey("split_size"))
+ splitSize = Integer.parseInt(urlQuery.get("split_size"));
+ if (urlQuery.containsKey("partitioner"))
+ partitionerClass = urlQuery.get("partitioner");
+ }
+ String[] parts = urlParts[0].split("/+");
+ String[] credentialsAndKeyspace = parts[1].split("@");
+ if (credentialsAndKeyspace.length > 1)
+ {
+ String[] credentials = credentialsAndKeyspace[0].split(":");
+ username = credentials[0];
+ password = credentials[1];
+ keyspace = credentialsAndKeyspace[1];
+ }
+ else
+ {
+ keyspace = parts[1];
+ }
+ column_family = parts[2];
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
+ "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
+ "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
+ }
+ }
+}
+
[2/2] git commit: Pig: support for cq3 tables Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5234
Posted by br...@apache.org.
Pig: support for cq3 tables
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5234
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33a3d2ca
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33a3d2ca
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33a3d2ca
Branch: refs/heads/cassandra-1.2
Commit: 33a3d2ca57e855ff5484fb039f14d424132db93b
Parents: 54266ea
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jun 26 16:52:20 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jun 26 16:52:20 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/pig/README.txt | 21 +-
examples/pig/test/test_storage.pig | 2 +-
.../hadoop/ColumnFamilyRecordReader.java | 39 +-
.../hadoop/pig/AbstractCassandraStorage.java | 770 ++++++++++++++++
.../cassandra/hadoop/pig/CassandraStorage.java | 892 +++++--------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 447 ++++++++++
7 files changed, 1507 insertions(+), 665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cb3fede..24d4c9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.2.7
* Fix serialization of the LEFT gossip value (CASSANDRA-5696)
+ * Pig: support for cql3 tables (CASSANDRA-5234)
1.2.6
* Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index e3d9af6..6dc0937 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -32,7 +32,10 @@ for input and output:
* PIG_OUTPUT_RPC_PORT : the port thrift is listening on for writing
* PIG_OUTPUT_PARTITIONER : cluster partitioner for writing
-Then you can run it like this:
+CassandraStorage
+================
+
+The CassandraStorage class is for any non-CQL3 ColumnFamilies you may have. For CQL3 support, refer to the CqlStorage section.
examples/pig$ bin/pig_cassandra -x local example-script.pig
@@ -71,8 +74,8 @@ already exist for this to work.
See the example in test/ to see how schema is inferred.
-Advanced Options
-================
+Advanced Options for CassandraStorage
+=====================================
The following environment variables default to false but can be set to true to enable them:
@@ -92,3 +95,15 @@ PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling
the amount of mapper tasks created. This can also be set in the LOAD url by
adding the 'split_size=X' parameter, where X is an integer amount for the size.
+CqlStorage
+==========
+
+The CqlStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies. The main difference is in the URL format:
+
+cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]
+
+Which in grunt, the simplest example would look like:
+
+grunt> rows = LOAD 'cql://MyKeyspace/MyColumnFamily' USING CqlStorage();
+
+CqlStorage handles wide rows automatically and thus has no separate flag for this.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/examples/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig
index 93dd91f..026cb02 100644
--- a/examples/pig/test/test_storage.pig
+++ b/examples/pig/test/test_storage.pig
@@ -1,4 +1,4 @@
-rows = LOAD 'cassandra://PigTest/SomeApp?widerows=true' USING CassandraStorage();
+rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage();
-- full copy
STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
-- single tuple
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index daef8ec..701260a 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -218,19 +218,32 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
{
try
{
- partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
-
- // Get the Keyspace metadata, then get the specific CF metadata
- // in order to populate the sub/comparator.
- KsDef ks_def = client.describe_keyspace(keyspace);
- List<String> cfnames = new ArrayList<String>();
- for (CfDef cfd : ks_def.cf_defs)
- cfnames.add(cfd.name);
- int idx = cfnames.indexOf(cfName);
- CfDef cf_def = ks_def.cf_defs.get(idx);
-
- comparator = TypeParser.parse(cf_def.comparator_type);
- subComparator = cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type);
+ partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
+ // get CF meta data
+ String query = "SELECT comparator," +
+ " subcomparator " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s' " +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, cfName)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ CfDef cfDef = new CfDef();
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+ cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+ ByteBuffer subComparator = cqlRow.columns.get(1).value;
+ if (subComparator != null)
+ cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+ }
+
+ comparator = TypeParser.parse(cfDef.comparator_type);
+ subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type);
}
catch (ConfigurationException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
new file mode 100644
index 0000000..ff575b2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.*;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Cassandra
+ */
+public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
+{
+ protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
+
+ // system environment variables that can be set to configure connection info:
+ // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+ public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+ public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+ public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+ public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+ public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+ public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
+ public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+ public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+ public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+ public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+ public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+ public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
+
+ protected String DEFAULT_INPUT_FORMAT;
+ protected String DEFAULT_OUTPUT_FORMAT;
+
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class);
+
+ protected String username;
+ protected String password;
+ protected String keyspace;
+ protected String column_family;
+ protected String loadSignature;
+ protected String storeSignature;
+
+ protected Configuration conf;
+ protected String inputFormatClass;
+ protected String outputFormatClass;
+ protected int splitSize = 64 * 1024;
+ protected String partitionerClass;
+
+ public AbstractCassandraStorage()
+ {
+ super();
+ }
+
+ /** Deconstructs a composite type to a Tuple. */
+ protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
+ {
+ List<CompositeComponent> result = comparator.deconstruct(name);
+ Tuple t = TupleFactory.getInstance().newTuple(result.size());
+ for (int i=0; i<result.size(); i++)
+ setTupleValue(t, i, result.get(i).comparator.compose(result.get(i).value));
+
+ return t;
+ }
+
+ /** convert a column to a tuple */
+ protected Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple pair = TupleFactory.getInstance().newTuple(2);
+
+ // name
+ if(comparator instanceof AbstractCompositeType)
+ setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
+ else
+ setTupleValue(pair, 0, comparator.compose(col.name()));
+
+ // value
+ if (col instanceof Column)
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ setTupleValue(pair, 1, marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()));
+ }
+ else
+ setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
+ return pair;
+ }
+ else
+ {
+ // super
+ ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+ for (IColumn subcol : col.getSubColumns())
+ subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+
+ pair.set(1, new DefaultDataBag(subcols));
+ }
+ return pair;
+ }
+
+ /** set the value to the position of the tuple */
+ protected void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+ {
+ if (value instanceof BigInteger)
+ pair.set(position, ((BigInteger) value).intValue());
+ else if (value instanceof ByteBuffer)
+ pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+ else if (value instanceof UUID)
+ pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+ else if (value instanceof Date)
+ pair.set(position, DateType.instance.decompose((Date) value).getLong());
+ else
+ pair.set(position, value);
+ }
+
+ /** get the columnfamily definition for the signature */
+ protected CfDef getCfDef(String signature)
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+ return cfdefFromString(property.getProperty(signature));
+ }
+
+ /** construct a map to store the mashaller type to cassandra data type mapping */
+ protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+ {
+ Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
+
+ comparator = parseType(cfDef.getComparator_type());
+ subcomparator = parseType(cfDef.getSubcomparator_type());
+ default_validator = parseType(cfDef.getDefault_validation_class());
+ key_validator = parseType(cfDef.getKey_validation_class());
+
+ marshallers.put(MarshallerType.COMPARATOR, comparator);
+ marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
+ marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
+ marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
+ return marshallers;
+ }
+
+ /** get the validators */
+ protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+ {
+ Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+ for (ColumnDef cd : cfDef.getColumn_metadata())
+ {
+ if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+ {
+ AbstractType validator = null;
+ try
+ {
+ validator = TypeParser.parse(cd.getValidation_class());
+ validators.put(cd.name, validator);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+ return validators;
+ }
+
+ /** parse the string to a cassandra data type */
+ protected AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public InputFormat getInputFormat()
+ {
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** decompose the query to store the parameters in a map*/
+ public static Map<String, String> getQueryMap(String query)
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>();
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], keyValue[1]);
+ }
+ return map;
+ }
+
+ /** set hadoop cassandra connection settings */
+ protected void setConnectionInformation() throws IOException
+ {
+ if (System.getenv(PIG_RPC_PORT) != null)
+ {
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ }
+
+ if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+ if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
+ if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+ {
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ }
+ if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+ if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+ if (System.getenv(PIG_PARTITIONER) != null)
+ {
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ }
+ if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+ if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+ if (System.getenv(PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+ }
+
+ /** get the full class name */
+ protected String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+ }
+
+ /** get pig type for the cassandra data type*/
+ protected byte getPigType(AbstractType type)
+ {
+ if (type instanceof LongType || type instanceof DateType) // DateType is bad and it should feel bad
+ return DataType.LONG;
+ else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
+ return DataType.INTEGER;
+ else if (type instanceof AsciiType)
+ return DataType.CHARARRAY;
+ else if (type instanceof UTF8Type)
+ return DataType.CHARARRAY;
+ else if (type instanceof FloatType)
+ return DataType.FLOAT;
+ else if (type instanceof DoubleType)
+ return DataType.DOUBLE;
+ else if (type instanceof AbstractCompositeType )
+ return DataType.TUPLE;
+
+ return DataType.BYTEARRAY;
+ }
+
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+ {
+ return location;
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature)
+ {
+ this.loadSignature = signature;
+ }
+
+ /** StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ this.storeSignature = signature;
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ /** output format */
+ public OutputFormat getOutputFormat()
+ {
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
+ /** convert object to ByteBuffer */
+ protected ByteBuffer objToBB(Object o)
+ {
+ if (o == null)
+ return (ByteBuffer)o;
+ if (o instanceof java.lang.String)
+ return ByteBuffer.wrap(new DataByteArray((String)o).get());
+ if (o instanceof Integer)
+ return Int32Type.instance.decompose((Integer)o);
+ if (o instanceof Long)
+ return LongType.instance.decompose((Long)o);
+ if (o instanceof Float)
+ return FloatType.instance.decompose((Float)o);
+ if (o instanceof Double)
+ return DoubleType.instance.decompose((Double)o);
+ if (o instanceof UUID)
+ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+ if(o instanceof Tuple) {
+ List<Object> objects = ((Tuple)o).getAll();
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ /** Methods to get the column family schema from Cassandra */
+ protected void initSchema(String signature)
+ {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(AbstractCassandraStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!properties.containsKey(signature))
+ {
+ try
+ {
+ Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
+ client.set_keyspace(keyspace);
+
+ if (username != null && password != null)
+ {
+ Map<String, String> credentials = new HashMap<String, String>(2);
+ credentials.put(IAuthenticator.USERNAME_KEY, username);
+ credentials.put(IAuthenticator.PASSWORD_KEY, password);
+
+ try
+ {
+ client.login(new AuthenticationRequest(credentials));
+ }
+ catch (AuthenticationException e)
+ {
+ logger.error("Authentication exception: invalid username and/or password");
+ throw new RuntimeException(e);
+ }
+ catch (AuthorizationException e)
+ {
+ throw new AssertionError(e); // never actually throws AuthorizationException.
+ }
+ }
+
+ // compose the CfDef for the columfamily
+ CfDef cfDef = getCfDef(client);
+
+ if (cfDef != null)
+ properties.setProperty(signature, cfdefToString(cfDef));
+ else
+ throw new RuntimeException(String.format("Column family '%s' not found in keyspace '%s'",
+ column_family,
+ keyspace));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (UnavailableException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (TimedOutException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (SchemaDisagreementException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /** convert CfDef to string */
+ protected static String cfdefToString(CfDef cfDef)
+ {
+ assert cfDef != null;
+ // this is so awful it's kind of cool!
+ TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+ try
+ {
+ return Hex.bytesToHex(serializer.serialize(cfDef));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** convert string back to CfDef */
+ protected static CfDef cfdefFromString(String st)
+ {
+ assert st != null;
+ TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+ CfDef cfDef = new CfDef();
+ try
+ {
+ deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return cfDef;
+ }
+
+ /** return the CfDef for the column family */
+ protected CfDef getCfDef(Cassandra.Client client)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException
+ {
+ // get CF meta data
+ String query = "SELECT type, " +
+ " comparator," +
+ " subcomparator," +
+ " default_validator, " +
+ " key_validator," +
+ " key_aliases " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s' " +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ CfDef cfDef = new CfDef();
+ cfDef.keyspace = keyspace;
+ cfDef.name = column_family;
+ boolean cql3Table = false;
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+
+ cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+ cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
+ ByteBuffer subComparator = cqlRow.columns.get(2).value;
+ if (subComparator != null)
+ cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+ cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
+ cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ List<String> keys = null;
+ if (cqlRow.columns.get(5).value != null)
+ {
+ String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value);
+ keys = FBUtilities.fromJsonList(keyAliases);
+ }
+ // get column meta data
+ if (keys != null && keys.size() > 0)
+ cql3Table = true;
+ }
+ cfDef.column_metadata = getColumnMetadata(client, cql3Table);
+ return cfDef;
+ }
+
+ /** get a list of columns */
+ protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException;
+
+ /** get column meta data */
+ protected List<ColumnDef> getColumnMeta(Cassandra.Client client)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException
+ {
+ String query = "SELECT column_name, " +
+ " validator, " +
+ " index_type " +
+ "FROM system.schema_columns " +
+ "WHERE keyspace_name = '%s' " +
+ " AND columnfamily_name = '%s'";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ List<CqlRow> rows = result.rows;
+ List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+ if (rows == null || rows.isEmpty())
+ return columnDefs;
+
+ Iterator<CqlRow> iterator = rows.iterator();
+ while (iterator.hasNext())
+ {
+ CqlRow row = iterator.next();
+ ColumnDef cDef = new ColumnDef();
+ cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
+ cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
+ ByteBuffer indexType = row.getColumns().get(2).value;
+ if (indexType != null)
+ cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
+ columnDefs.add(cDef);
+ }
+ return columnDefs;
+ }
+
+ /** get keys meta data */
+ protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ IOException
+ {
+ String query = "SELECT key_aliases, " +
+ " column_aliases, " +
+ " key_validator, " +
+ " comparator, " +
+ " keyspace_name, " +
+ " value_alias, " +
+ " default_validator " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s'" +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ List<CqlRow> rows = result.rows;
+ Iterator<CqlRow> iteraRow = rows.iterator();
+ List<ColumnDef> keys = new ArrayList<ColumnDef>();
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+ String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ logger.debug("Found ksDef name: {}", name);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+ logger.debug("partition keys: " + keyString);
+ List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+ Iterator<String> iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+
+ keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+ logger.debug("cluster keys: " + keyString);
+ keyNames = FBUtilities.fromJsonList(keyString);
+
+ iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+ logger.debug("row key validator: " + validator);
+ AbstractType<?> keyValidator = parseType(validator);
+
+ Iterator<ColumnDef> keyItera = keys.iterator();
+ if (keyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+ while (typeItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = keyValidator.toString();
+
+ validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+ logger.debug("cluster key validator: " + validator);
+
+ if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+ {
+ AbstractType<?> clusterKeyValidator = parseType(validator);
+
+ if (clusterKeyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+ while (keyItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = clusterKeyValidator.toString();
+ }
+
+ // compact value_alias column
+ if (cqlRow.columns.get(5).value != null)
+ {
+ try
+ {
+ String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+ logger.debug("default validator: " + compactValidator);
+ AbstractType<?> defaultValidator = parseType(compactValidator);
+
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = cqlRow.columns.get(5).value;
+ cDef.validation_class = defaultValidator.toString();
+ keys.add(cDef);
+ }
+ catch (Exception e)
+ {
+ // no compact column at value_alias
+ }
+ }
+
+ }
+ return keys;
+ }
+
+ /** get index type from string */
+ protected IndexType getIndexType(String type)
+ {
+ type = type.toLowerCase();
+ if ("keys".equals(type))
+ return IndexType.KEYS;
+ else if("custom".equals(type))
+ return IndexType.CUSTOM;
+ else if("composites".equals(type))
+ return IndexType.COMPOSITES;
+ else
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 6490d05..ed445a2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -18,34 +18,25 @@
package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
-import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.util.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.UUIDGen;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.*;
-import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.Expression;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
@@ -57,30 +48,11 @@ import org.slf4j.LoggerFactory;
*
* A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
*/
-public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
+public class CassandraStorage extends AbstractCassandraStorage
{
- private enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
-
- // system environment variables that can be set to configure connection info:
- // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
- public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
- public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
- public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
- public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
- public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
- public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
- public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
- public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
- public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
- public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
- public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
- public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
-
- private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
- private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
@@ -91,22 +63,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private ByteBuffer slice_end = BOUND;
private boolean slice_reverse = false;
private boolean allow_deletes = false;
- private String username;
- private String password;
- private String keyspace;
- private String column_family;
- private String loadSignature;
- private String storeSignature;
-
- private Configuration conf;
+
private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader;
private RecordWriter<ByteBuffer, List<Mutation>> writer;
- private String inputFormatClass;
- private String outputFormatClass;
- private int limit;
+
private boolean widerows = false;
private boolean usePartitionFilter = false;
- private int splitSize = 64 * 1024;
+ private int limit;
+
// wide row hacks
private ByteBuffer lastKey;
private Map<ByteBuffer,IColumn> lastRow;
@@ -117,13 +81,13 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
this(1024);
}
- /**
- * @param limit number of columns to fetch in a slice
- */
+ /**@param limit number of columns to fetch in a slice */
public CassandraStorage(int limit)
{
super();
this.limit = limit;
+ DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
+ DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
}
public int getLimit()
@@ -131,6 +95,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return limit;
}
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ {
+ this.reader = reader;
+ }
+
+ /** read wide row*/
public Tuple getNextWide() throws IOException
{
CfDef cfDef = getCfDef(loadSignature);
@@ -229,6 +199,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
@Override
+ /** read next row */
public Tuple getNext() throws IOException
{
if (widerows)
@@ -289,315 +260,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
}
- /**
- * Deconstructs a composite type to a Tuple.
- */
- private Tuple composeComposite( AbstractCompositeType comparator, ByteBuffer name ) throws IOException
- {
- List<CompositeComponent> result = comparator.deconstruct( name );
-
- Tuple t = TupleFactory.getInstance().newTuple( result.size() );
-
- for( int i = 0; i < result.size(); i++ )
- {
- setTupleValue( t, i, result.get(i).comparator.compose( result.get(i).value ) );
- }
-
- return t;
- }
-
- private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
- {
- Tuple tuple = TupleFactory.getInstance().newTuple(1);
- addKeyToTuple(tuple, key, cfDef, comparator);
- return tuple;
- }
-
- private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
- {
- if( comparator instanceof AbstractCompositeType )
- {
- setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
- }
- else
- {
- setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
- }
-
- }
-
- private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
- {
- Tuple pair = TupleFactory.getInstance().newTuple(2);
-
- if( comparator instanceof AbstractCompositeType )
- {
- setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
- }
- else
- {
- setTupleValue(pair, 0, comparator.compose(col.name()));
- }
- if (col instanceof Column)
- {
- // standard
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-
- if (validators.get(col.name()) == null)
- {
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- setTupleValue(pair, 1, marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()));
- }
- else
- setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
- return pair;
- }
- else
- {
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
-
- pair.set(1, new DefaultDataBag(subcols));
- }
- return pair;
- }
-
- private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
- {
- if (value instanceof BigInteger)
- pair.set(position, ((BigInteger) value).intValue());
- else if (value instanceof ByteBuffer)
- pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
- else if (value instanceof UUID)
- pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
- else if (value instanceof Date)
- pair.set(position, DateType.instance.decompose((Date) value).getLong());
- else
- pair.set(position, value);
- }
-
- private CfDef getCfDef(String signature)
- {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(CassandraStorage.class);
- return cfdefFromString(property.getProperty(signature));
- }
-
- private List<IndexExpression> getIndexExpressions()
- {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(CassandraStorage.class);
- if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
- return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
- else
- return null;
- }
-
- private Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
- {
- Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
- AbstractType comparator;
- AbstractType subcomparator;
- AbstractType default_validator;
- AbstractType key_validator;
-
- comparator = parseType(cfDef.getComparator_type());
- subcomparator = parseType(cfDef.getSubcomparator_type());
- default_validator = parseType(cfDef.getDefault_validation_class());
- key_validator = parseType(cfDef.getKey_validation_class());
-
- marshallers.put(MarshallerType.COMPARATOR, comparator);
- marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
- marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
- marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
- return marshallers;
- }
-
- private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
- {
- Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
- for (ColumnDef cd : cfDef.getColumn_metadata())
- {
- if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
- {
- AbstractType validator = null;
- try
- {
- validator = TypeParser.parse(cd.getValidation_class());
- validators.put(cd.name, validator);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
- }
- return validators;
- }
-
- private AbstractType parseType(String type) throws IOException
- {
- try
- {
- // always treat counters like longs, specifically CCT.compose is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- @Override
- public InputFormat getInputFormat()
- {
- try
- {
- return FBUtilities.construct(inputFormatClass, "inputformat");
- }
- catch (ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split)
- {
- this.reader = reader;
- }
-
- public static Map<String, String> getQueryMap(String query)
- {
- String[] params = query.split("&");
- Map<String, String> map = new HashMap<String, String>();
- for (String param : params)
- {
- String[] keyValue = param.split("=");
- map.put(keyValue[0], keyValue[1]);
- }
- return map;
- }
-
- private void setLocationFromUri(String location) throws IOException
+ /** set hadoop cassandra connection settings */
+ protected void setConnectionInformation() throws IOException
{
- try
- {
- if (!location.startsWith("cassandra://"))
- throw new Exception("Bad scheme.");
- String[] urlParts = location.split("\\?");
- if (urlParts.length > 1)
- {
- Map<String, String> urlQuery = getQueryMap(urlParts[1]);
- AbstractType comparator = BytesType.instance;
- if (urlQuery.containsKey("comparator"))
- comparator = TypeParser.parse(urlQuery.get("comparator"));
- if (urlQuery.containsKey("slice_start"))
- slice_start = comparator.fromString(urlQuery.get("slice_start"));
- if (urlQuery.containsKey("slice_end"))
- slice_end = comparator.fromString(urlQuery.get("slice_end"));
- if (urlQuery.containsKey("reversed"))
- slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
- if (urlQuery.containsKey("limit"))
- limit = Integer.parseInt(urlQuery.get("limit"));
- if (urlQuery.containsKey("allow_deletes"))
- allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes"));
- if (urlQuery.containsKey("widerows"))
- widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
- if (urlQuery.containsKey("use_secondary"))
- usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
- if (urlQuery.containsKey("split_size"))
- splitSize = Integer.parseInt(urlQuery.get("split_size"));
- }
- String[] parts = urlParts[0].split("/+");
- String[] credentialsAndKeyspace = parts[1].split("@");
- if (credentialsAndKeyspace.length > 1)
- {
- String[] credentials = credentialsAndKeyspace[0].split(":");
- username = credentials[0];
- password = credentials[1];
- keyspace = credentialsAndKeyspace[1];
- }
- else
- {
- keyspace = parts[1];
- }
- column_family = parts[2];
- }
- catch (Exception e)
- {
- throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]': " + e.getMessage());
- }
- }
-
- private void setConnectionInformation() throws IOException
- {
- if (System.getenv(PIG_RPC_PORT) != null)
- {
- ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
- ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
- }
-
- if (System.getenv(PIG_INPUT_RPC_PORT) != null)
- ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
- if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
- ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
-
- if (System.getenv(PIG_INITIAL_ADDRESS) != null)
- {
- ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- }
- if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
- ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
- if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
- ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
-
- if (System.getenv(PIG_PARTITIONER) != null)
- {
- ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- }
- if(System.getenv(PIG_INPUT_PARTITIONER) != null)
- ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
- if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
- ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
- if (System.getenv(PIG_INPUT_FORMAT) != null)
- inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
- else
- inputFormatClass = DEFAULT_INPUT_FORMAT;
- if (System.getenv(PIG_OUTPUT_FORMAT) != null)
- outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
- else
- outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+ super.setConnectionInformation();
if (System.getenv(PIG_ALLOW_DELETES) != null)
allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
}
- private String getFullyQualifiedClassName(String classname)
- {
- return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
- }
-
- @Override
+ /** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
conf = job.getConfiguration();
-
- // don't combine mappers to a single mapper per node
- conf.setBoolean("pig.noSplitCombination", true);
setLocationFromUri(location);
if (ConfigHelper.getInputSlicePredicate(conf) == null)
@@ -616,20 +290,22 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
ConfigHelper.setInputSplitSize(conf, Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE)));
}
- catch(NumberFormatException e)
+ catch (NumberFormatException e)
{
throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a number", e);
}
- }
+ }
if (usePartitionFilter && getIndexExpressions() != null)
ConfigHelper.setInputRange(conf, getIndexExpressions());
if (username != null && password != null)
ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
-
+
if (splitSize > 0)
ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setInputPartitioner(conf, partitionerClass);
ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
setConnectionInformation();
@@ -645,6 +321,40 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
initSchema(loadSignature);
}
+ /** set store configuration settings */
+ public void setStoreLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+
+ // don't combine mappers to a single mapper per node
+ conf.setBoolean("pig.noSplitCombination", true);
+ setLocationFromUri(location);
+
+ if (username != null && password != null)
+ ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setOutputPartitioner(conf, partitionerClass);
+
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+ setConnectionInformation();
+
+ if (ConfigHelper.getOutputRpcPort(conf) == 0)
+ throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+ // we have to do this again here for the check in writeColumnsFromTuple
+ if (System.getenv(PIG_USE_SECONDARY) != null)
+ usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
+
+ initSchema(storeSignature);
+ }
+
+ /** define the schema */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
@@ -695,30 +405,34 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
- // defined validators/indexes
- for (ColumnDef cdef : cfDef.column_metadata)
+ if (!widerows)
{
- // make a new tuple for each col/val pair
- ResourceSchema innerTupleSchema = new ResourceSchema();
- ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
- innerTupleField.setType(DataType.TUPLE);
- innerTupleField.setSchema(innerTupleSchema);
- innerTupleField.setName(new String(cdef.getName()));
-
- ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
- idxColSchema.setName("name");
- idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
-
- ResourceFieldSchema valSchema = new ResourceFieldSchema();
- AbstractType validator = validators.get(cdef.name);
- if (validator == null)
- validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
- valSchema.setName("value");
- valSchema.setType(getPigType(validator));
-
- innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
- allSchemaFields.add(innerTupleField);
+ // defined validators/indexes
+ for (ColumnDef cdef : cfDef.column_metadata)
+ {
+ // make a new tuple for each col/val pair
+ ResourceSchema innerTupleSchema = new ResourceSchema();
+ ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
+ innerTupleField.setType(DataType.TUPLE);
+ innerTupleField.setSchema(innerTupleSchema);
+ innerTupleField.setName(new String(cdef.getName()));
+
+ ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
+ idxColSchema.setName("name");
+ idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+
+ ResourceFieldSchema valSchema = new ResourceFieldSchema();
+ AbstractType validator = validators.get(cdef.name);
+ if (validator == null)
+ validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+ valSchema.setName("value");
+ valSchema.setType(getPigType(validator));
+
+ innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
+ allSchemaFields.add(innerTupleField);
+ }
}
+
// bag at the end for unknown columns
allSchemaFields.add(bagField);
@@ -741,31 +455,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return schema;
}
- private byte getPigType(AbstractType type)
- {
- if (type instanceof LongType || type instanceof DateType) // DateType is bad and it should feel bad
- return DataType.LONG;
- else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
- return DataType.INTEGER;
- else if (type instanceof AsciiType)
- return DataType.CHARARRAY;
- else if (type instanceof UTF8Type)
- return DataType.CHARARRAY;
- else if (type instanceof FloatType)
- return DataType.FLOAT;
- else if (type instanceof DoubleType)
- return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType )
- return DataType.TUPLE;
-
- return DataType.BYTEARRAY;
- }
-
- public ResourceStatistics getStatistics(String location, Job job)
- {
- return null;
- }
-
+ /** return partition keys */
public String[] getPartitionKeys(String location, Job job)
{
if (!usePartitionFilter)
@@ -779,170 +469,21 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return partitionKeys;
}
+ /** set partition filter */
public void setPartitionFilter(Expression partitionFilter)
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(CassandraStorage.class);
+ Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
}
- private List<IndexExpression> filterToIndexExpressions(Expression expression)
- {
- List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>();
- Expression.BinaryExpression be = (Expression.BinaryExpression)expression;
- ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
- ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
- switch (expression.getOpType())
- {
- case OP_EQ:
- indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value));
- break;
- case OP_GE:
- indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value));
- break;
- case OP_GT:
- indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value));
- break;
- case OP_LE:
- indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value));
- break;
- case OP_LT:
- indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value));
- break;
- case OP_AND:
- indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
- indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
- break;
- default:
- throw new RuntimeException("Unsupported expression type: " + expression.getOpType().name());
- }
- return indexExpressions;
- }
-
- private List<ColumnDef> getIndexes()
- {
- CfDef cfdef = getCfDef(loadSignature);
- List<ColumnDef> indexes = new ArrayList<ColumnDef>();
- for (ColumnDef cdef : cfdef.column_metadata)
- {
- if (cdef.index_type != null)
- indexes.add(cdef);
- }
- return indexes;
- }
-
- @Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException
- {
- return location;
- }
-
- @Override
- public void setUDFContextSignature(String signature)
- {
- this.loadSignature = signature;
- }
-
- /* StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature)
- {
- this.storeSignature = signature;
- }
-
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
- {
- return relativeToAbsolutePath(location, curDir);
- }
-
- public void setStoreLocation(String location, Job job) throws IOException
- {
- conf = job.getConfiguration();
- setLocationFromUri(location);
-
- if (username != null && password != null)
- ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
-
- ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
- setConnectionInformation();
-
- if (ConfigHelper.getOutputRpcPort(conf) == 0)
- throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
- if (ConfigHelper.getOutputInitialAddress(conf) == null)
- throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
- if (ConfigHelper.getOutputPartitioner(conf) == null)
- throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-
- // we have to do this again here for the check in writeColumnsFromTuple
- if (System.getenv(PIG_USE_SECONDARY) != null)
- usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
-
- initSchema(storeSignature);
- }
-
- public OutputFormat getOutputFormat()
- {
- try
- {
- return FBUtilities.construct(outputFormatClass, "outputformat");
- }
- catch (ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void checkSchema(ResourceSchema schema) throws IOException
- {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
+ /** prepare writer */
public void prepareToWrite(RecordWriter writer)
{
this.writer = writer;
}
- private ByteBuffer objToBB(Object o)
- {
- if (o == null)
- return (ByteBuffer)o;
- if (o instanceof java.lang.String)
- return ByteBuffer.wrap(new DataByteArray((String)o).get());
- if (o instanceof Integer)
- return Int32Type.instance.decompose((Integer)o);
- if (o instanceof Long)
- return LongType.instance.decompose((Long)o);
- if (o instanceof Float)
- return FloatType.instance.decompose((Float)o);
- if (o instanceof Double)
- return DoubleType.instance.decompose((Double)o);
- if (o instanceof UUID)
- return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
- if(o instanceof Tuple) {
- List<Object> objects = ((Tuple)o).getAll();
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
- {
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
- }
- out.flip();
- return out;
- }
-
- return ByteBuffer.wrap(((DataByteArray) o).get());
- }
-
+ /** write next row */
public void putNext(Tuple t) throws IOException
{
/*
@@ -971,6 +512,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
throw new IOException("Second argument in output must be a tuple or bag");
}
+ /** write tuple data to cassandra */
private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException
{
ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
@@ -993,6 +535,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
writeMutations(key, mutationList);
}
+ /** compose Cassandra mutation from tuple */
private Mutation mutationFromTuple(Tuple t) throws IOException
{
Mutation mutation = new Mutation();
@@ -1021,6 +564,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return mutation;
}
+ /** write bag data to Cassandra */
private void writeColumnsFromBag(ByteBuffer key, DefaultDataBag bag) throws IOException
{
List<Mutation> mutationList = new ArrayList<Mutation>();
@@ -1074,6 +618,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
writeMutations(key, mutationList);
}
+ /** write mutation to Cassandra */
private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException
{
try
@@ -1086,90 +631,65 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
}
- public void cleanupOnFailure(String failure, Job job)
+ /** get a list of Cassandra IndexExpression from Pig expression */
+ private List<IndexExpression> filterToIndexExpressions(Expression expression)
{
+ List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>();
+ Expression.BinaryExpression be = (Expression.BinaryExpression)expression;
+ ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
+ ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
+ switch (expression.getOpType())
+ {
+ case OP_EQ:
+ indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value));
+ break;
+ case OP_GE:
+ indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value));
+ break;
+ case OP_GT:
+ indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value));
+ break;
+ case OP_LE:
+ indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value));
+ break;
+ case OP_LT:
+ indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value));
+ break;
+ case OP_AND:
+ indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
+ indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
+ break;
+ default:
+ throw new RuntimeException("Unsupported expression type: " + expression.getOpType().name());
+ }
+ return indexExpressions;
}
- /* Methods to get the column family schema from Cassandra */
-
- private void initSchema(String signature)
+ /** get a list of columns with defined index*/
+ private List<ColumnDef> getIndexes()
{
- Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
-
- // Only get the schema if we haven't already gotten it
- if (!properties.containsKey(signature))
+ CfDef cfdef = getCfDef(loadSignature);
+ List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+ for (ColumnDef cdef : cfdef.column_metadata)
{
- try
- {
- Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
- client.set_keyspace(keyspace);
-
- if (username != null && password != null)
- {
- Map<String, String> credentials = new HashMap<String, String>(2);
- credentials.put(IAuthenticator.USERNAME_KEY, username);
- credentials.put(IAuthenticator.PASSWORD_KEY, password);
-
- try
- {
- client.login(new AuthenticationRequest(credentials));
- }
- catch (AuthenticationException e)
- {
- logger.error("Authentication exception: invalid username and/or password");
- throw new RuntimeException(e);
- }
- catch (AuthorizationException e)
- {
- throw new AssertionError(e); // never actually throws AuthorizationException.
- }
- }
-
- CfDef cfDef = null;
- KsDef ksDef = client.describe_keyspace(keyspace);
- List<CfDef> defs = ksDef.getCf_defs();
- for (CfDef def : defs)
- {
- if (column_family.equalsIgnoreCase(def.getName()))
- {
- cfDef = def;
- break;
- }
- }
- if (cfDef != null)
- properties.setProperty(signature, cfdefToString(cfDef));
- else
- throw new RuntimeException(String.format("Column family '%s' not found in keyspace '%s'",
- column_family,
- keyspace));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- catch (NotFoundException e)
- {
- throw new RuntimeException(e);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ if (cdef.index_type != null)
+ indexes.add(cdef);
}
+ return indexes;
}
- private static String cfdefToString(CfDef cfDef)
+ /** convert a list of index expression to string */
+ private static String indexExpressionsToString(List<IndexExpression> indexExpressions)
{
- assert cfDef != null;
- // this is so awful it's kind of cool!
+ assert indexExpressions != null;
+ // oh, you thought cfdefToString was awful?
+ IndexClause indexClause = new IndexClause();
+ indexClause.setExpressions(indexExpressions);
+ indexClause.setStart_key("".getBytes());
TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
try
{
- return Hex.bytesToHex(serializer.serialize(cfDef));
+ return Hex.bytesToHex(serializer.serialize(indexClause));
}
catch (TException e)
{
@@ -1177,54 +697,130 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
}
- private static CfDef cfdefFromString(String st)
+ /** convert string to a list of index expression */
+ private static List<IndexExpression> indexExpressionsFromString(String ie)
{
- assert st != null;
+ assert ie != null;
TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- CfDef cfDef = new CfDef();
+ IndexClause indexClause = new IndexClause();
try
{
- deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+ deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
}
catch (TException e)
{
throw new RuntimeException(e);
}
- return cfDef;
+ return indexClause.getExpressions();
}
- private static String indexExpressionsToString(List<IndexExpression> indexExpressions)
+ /** get a list of index expression */
+ private List<IndexExpression> getIndexExpressions()
{
- assert indexExpressions != null;
- // oh, you thought cfdefToString was awful?
- IndexClause indexClause = new IndexClause();
- indexClause.setExpressions(indexExpressions);
- indexClause.setStart_key("".getBytes());
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+ if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
+ return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+ else
+ return null;
+ }
+
+ /** get a list of column for the column family */
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ throws InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException
+ {
+ if (cql3Table)
+ return new ArrayList<ColumnDef>();
+
+ return getColumnMeta(client);
+ }
+
+ /** convert key to a tuple */
+ private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ addKeyToTuple(tuple, key, cfDef, comparator);
+ return tuple;
+ }
+
+ /** add key to a tuple */
+ private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ if( comparator instanceof AbstractCompositeType )
{
- return Hex.bytesToHex(serializer.serialize(indexClause));
+ setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
}
- catch (TException e)
+ else
{
- throw new RuntimeException(e);
+ setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
}
+
}
- private static List<IndexExpression> indexExpressionsFromString(String ie)
+ /** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
+ * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
+ * [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
+ private void setLocationFromUri(String location) throws IOException
{
- assert ie != null;
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- IndexClause indexClause = new IndexClause();
try
{
- deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
+ if (!location.startsWith("cassandra://"))
+ throw new Exception("Bad scheme." + location);
+
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+ AbstractType comparator = BytesType.instance;
+ if (urlQuery.containsKey("comparator"))
+ comparator = TypeParser.parse(urlQuery.get("comparator"));
+ if (urlQuery.containsKey("slice_start"))
+ slice_start = comparator.fromString(urlQuery.get("slice_start"));
+ if (urlQuery.containsKey("slice_end"))
+ slice_end = comparator.fromString(urlQuery.get("slice_end"));
+ if (urlQuery.containsKey("reversed"))
+ slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
+ if (urlQuery.containsKey("limit"))
+ limit = Integer.parseInt(urlQuery.get("limit"));
+ if (urlQuery.containsKey("allow_deletes"))
+ allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes"));
+ if (urlQuery.containsKey("widerows"))
+ widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
+ if (urlQuery.containsKey("use_secondary"))
+ usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
+ if (urlQuery.containsKey("split_size"))
+ splitSize = Integer.parseInt(urlQuery.get("split_size"));
+ if (urlQuery.containsKey("partitioner"))
+ partitionerClass = urlQuery.get("partitioner");
+ }
+ String[] parts = urlParts[0].split("/+");
+ String[] credentialsAndKeyspace = parts[1].split("@");
+ if (credentialsAndKeyspace.length > 1)
+ {
+ String[] credentials = credentialsAndKeyspace[0].split(":");
+ username = credentials[0];
+ password = credentials[1];
+ keyspace = credentialsAndKeyspace[1];
+ }
+ else
+ {
+ keyspace = parts[1];
+ }
+ column_family = parts[2];
}
- catch (TException e)
+ catch (Exception e)
{
- throw new RuntimeException(e);
+ throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<columnfamily>" +
+ "[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]" +
+ "[&allow_deletes=true][&widerows=true][&use_secondary=true]" +
+ "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
}
- return indexClause.getExpressions();
}
+
}