You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/05 22:56:59 UTC
[2/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0a64c87..1ad80b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -18,30 +18,46 @@
package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.db.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.auth.PasswordAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
@@ -52,7 +68,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
*
* A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
*/
-public class CassandraStorage extends AbstractCassandraStorage
+@Deprecated
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
@@ -71,6 +88,28 @@ public class CassandraStorage extends AbstractCassandraStorage
private boolean widerows = false;
private int limit;
+
+ protected String DEFAULT_INPUT_FORMAT;
+ protected String DEFAULT_OUTPUT_FORMAT;
+
+ protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
+
+ protected String username;
+ protected String password;
+ protected String keyspace;
+ protected String column_family;
+ protected String loadSignature;
+ protected String storeSignature;
+
+ protected Configuration conf;
+ protected String inputFormatClass;
+ protected String outputFormatClass;
+ protected int splitSize = 64 * 1024;
+ protected String partitionerClass;
+ protected boolean usePartitionFilter = false;
+ protected String initHostAddress;
+ protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
// wide row hacks
private ByteBuffer lastKey;
@@ -104,8 +143,7 @@ public class CassandraStorage extends AbstractCassandraStorage
/** read wide row*/
public Tuple getNextWide() throws IOException
{
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = null;
Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
@@ -128,7 +166,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
@@ -166,7 +204,7 @@ public class CassandraStorage extends AbstractCassandraStorage
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
@@ -183,14 +221,14 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
}
}
@@ -200,7 +238,6 @@ public class CassandraStorage extends AbstractCassandraStorage
}
}
- @Override
/** read next row */
public Tuple getNext() throws IOException
{
@@ -212,8 +249,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
assert key != null && cf != null;
@@ -240,7 +276,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
if (hasColumn)
{
- tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
+ tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
}
else if (!cql3Table)
{ // otherwise, we need to add an empty tuple to take its place
@@ -252,7 +288,7 @@ public class CassandraStorage extends AbstractCassandraStorage
for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
@@ -260,7 +296,7 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (ColumnDef cdef : getIndexes())
{
- Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+ Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
tuple.append(throwaway.get(1));
}
}
@@ -272,14 +308,57 @@ public class CassandraStorage extends AbstractCassandraStorage
}
}
+ /** write next row */
+ public void putNext(Tuple t) throws IOException
+ {
+ /*
+ We support two cases for output:
+ First, the original output:
+ (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
+ For supers, we only accept the original output.
+ */
+
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+ ByteBuffer key = objToBB(t.get(0));
+ if (t.getType(1) == DataType.TUPLE)
+ writeColumnsFromTuple(key, t, 1);
+ else if (t.getType(1) == DataType.BAG)
+ {
+ if (t.size() > 2)
+ throw new IOException("No arguments allowed after bag");
+ writeColumnsFromBag(key, (DataBag) t.get(1));
+ }
+ else
+ throw new IOException("Second argument in output must be a tuple or bag");
+ }
+
/** set hadoop cassandra connection settings */
protected void setConnectionInformation() throws IOException
{
- super.setConnectionInformation();
+ StorageHelper.setConnectionInformation(conf);
+ if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
if (System.getenv(PIG_ALLOW_DELETES) != null)
allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
}
+ /** get the full class name */
+ protected String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -296,11 +375,11 @@ public class CassandraStorage extends AbstractCassandraStorage
widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT));
if (System.getenv(PIG_USE_SECONDARY) != null)
usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
- if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
{
try
{
- ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
}
catch (NumberFormatException e)
{
@@ -380,12 +459,67 @@ public class CassandraStorage extends AbstractCassandraStorage
initSchema(storeSignature);
}
+ /** Methods to get the column family schema from Cassandra */
+ protected void initSchema(String signature) throws IOException
+ {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!properties.containsKey(signature))
+ {
+ try
+ {
+ Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
+ client.set_keyspace(keyspace);
+
+ if (username != null && password != null)
+ {
+ Map<String, String> credentials = new HashMap<String, String>(2);
+ credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
+ credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
+
+ try
+ {
+ client.login(new AuthenticationRequest(credentials));
+ }
+ catch (AuthenticationException e)
+ {
+ logger.error("Authentication exception: invalid username and/or password");
+ throw new IOException(e);
+ }
+ }
+
+ // compose the CfDef for the columfamily
+ CfDef cfDef = getCfDef(client);
+
+ if (cfDef != null)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(cfdefToString(cfDef));
+ properties.setProperty(signature, sb.toString());
+ }
+ else
+ throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
+ column_family,
+ keyspace));
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
/** define the schema */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
if (cfDef.column_type.equals("Super"))
return null;
/*
@@ -405,7 +539,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add key
ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
keyFieldSchema.setName("key");
- keyFieldSchema.setType(getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
+ keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
ResourceSchema bagSchema = new ResourceSchema();
ResourceFieldSchema bagField = new ResourceFieldSchema();
@@ -419,8 +553,8 @@ public class CassandraStorage extends AbstractCassandraStorage
ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
bagcolSchema.setName("name");
bagvalSchema.setName("value");
- bagcolSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
- bagvalSchema.setType(getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
+ bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+ bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
bagTupleField.setSchema(bagTupleSchema);
bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
@@ -431,7 +565,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
- if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+ if (!widerows)
{
// defined validators/indexes
for (ColumnDef cdef : cfDef.column_metadata)
@@ -445,14 +579,14 @@ public class CassandraStorage extends AbstractCassandraStorage
ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
idxColSchema.setName("name");
- idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+ idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
ResourceFieldSchema valSchema = new ResourceFieldSchema();
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
valSchema.setName("value");
- valSchema.setType(getPigType(validator));
+ valSchema.setType(StorageHelper.getPigType(validator));
innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
allSchemaFields.add(innerTupleField);
@@ -472,7 +606,7 @@ public class CassandraStorage extends AbstractCassandraStorage
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
- idxSchema.setType(getPigType(validator));
+ idxSchema.setType(StorageHelper.getPigType(validator));
allSchemaFields.add(idxSchema);
}
}
@@ -485,8 +619,8 @@ public class CassandraStorage extends AbstractCassandraStorage
public void setPartitionFilter(Expression partitionFilter) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
}
/** prepare writer */
@@ -495,33 +629,93 @@ public class CassandraStorage extends AbstractCassandraStorage
this.writer = writer;
}
- /** write next row */
- public void putNext(Tuple t) throws IOException
+ /** convert object to ByteBuffer */
+ protected ByteBuffer objToBB(Object o)
{
- /*
- We support two cases for output:
- First, the original output:
- (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
- For supers, we only accept the original output.
- */
+ if (o == null)
+ return nullToBB();
+ if (o instanceof java.lang.String)
+ return ByteBuffer.wrap(new DataByteArray((String)o).get());
+ if (o instanceof Integer)
+ return Int32Type.instance.decompose((Integer)o);
+ if (o instanceof Long)
+ return LongType.instance.decompose((Long)o);
+ if (o instanceof Float)
+ return FloatType.instance.decompose((Float)o);
+ if (o instanceof Double)
+ return DoubleType.instance.decompose((Double)o);
+ if (o instanceof UUID)
+ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+ if(o instanceof Tuple) {
+ List<Object> objects = ((Tuple)o).getAll();
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
+ {
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
- if (t.size() < 1)
+ }
+ return objToCompositeBB(objects);
+ }
+
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
{
- // simply nothing here, we can't even delete without a key
- logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
- return;
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
}
- ByteBuffer key = objToBB(t.get(0));
- if (t.getType(1) == DataType.TUPLE)
- writeColumnsFromTuple(key, t, 1);
- else if (t.getType(1) == DataType.BAG)
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
{
- if (t.size() > 2)
- throw new IOException("No arguments allowed after bag");
- writeColumnsFromBag(key, (DataBag) t.get(1));
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
}
- else
- throw new IOException("Second argument in output must be a tuple or bag");
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
}
/** write tuple data to cassandra */
@@ -643,6 +837,19 @@ public class CassandraStorage extends AbstractCassandraStorage
}
}
+ /** get a list of columns with defined index*/
+ protected List<ColumnDef> getIndexes() throws IOException
+ {
+ CfDef cfdef = getCfDef(loadSignature);
+ List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+ for (ColumnDef cdef : cfdef.column_metadata)
+ {
+ if (cdef.index_type != null)
+ indexes.add(cdef);
+ }
+ return indexes;
+ }
+
/** get a list of Cassandra IndexExpression from Pig expression */
private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException
{
@@ -713,13 +920,64 @@ public class CassandraStorage extends AbstractCassandraStorage
return indexClause.getExpressions();
}
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+
+ /** StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ this.storeSignature = signature;
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ /** output format */
+ public OutputFormat getOutputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public InputFormat getInputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
/** get a list of index expression */
private List<IndexExpression> getIndexExpressions() throws IOException
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
- return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null)
+ return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE));
else
return null;
}
@@ -731,6 +989,129 @@ public class CassandraStorage extends AbstractCassandraStorage
return getColumnMeta(client, true, true);
}
+
+ /** get column meta data */
+ protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
+ throws org.apache.cassandra.thrift.InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException,
+ NotFoundException
+ {
+ String query = String.format("SELECT column_name, validator, index_type, type " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNS,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+ List<CqlRow> rows = result.rows;
+ List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+ if (rows == null || rows.isEmpty())
+ {
+ // if CassandraStorage, just return the empty list
+ if (cassandraStorage)
+ return columnDefs;
+
+ // otherwise for CqlNativeStorage, check metadata for classic thrift tables
+ CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+ for (ColumnDefinition def : cfm.regularAndStaticColumns())
+ {
+ ColumnDef cDef = new ColumnDef();
+ String columnName = def.name.toString();
+ String type = def.type.toString();
+ logger.debug("name: {}, type: {} ", columnName, type);
+ cDef.name = ByteBufferUtil.bytes(columnName);
+ cDef.validation_class = type;
+ columnDefs.add(cDef);
+ }
+ // we may not need to include the value column for compact tables as we
+ // could have already processed it as schema_columnfamilies.value_alias
+ if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
+ {
+ ColumnDefinition def = cfm.compactValueColumn();
+ if ("value".equals(def.name.toString()))
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = def.name.bytes;
+ cDef.validation_class = def.type.toString();
+ columnDefs.add(cDef);
+ }
+ }
+ return columnDefs;
+ }
+
+ Iterator<CqlRow> iterator = rows.iterator();
+ while (iterator.hasNext())
+ {
+ CqlRow row = iterator.next();
+ ColumnDef cDef = new ColumnDef();
+ String type = ByteBufferUtil.string(row.getColumns().get(3).value);
+ if (!type.equals("regular"))
+ continue;
+ cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
+ cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
+ ByteBuffer indexType = row.getColumns().get(2).value;
+ if (indexType != null)
+ cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
+ columnDefs.add(cDef);
+ }
+ return columnDefs;
+ }
+
+
+ /** get CFMetaData of a column family */
+ protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
+ throws NotFoundException,
+ org.apache.cassandra.thrift.InvalidRequestException,
+ TException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException
+ {
+ KsDef ksDef = client.describe_keyspace(ks);
+ for (CfDef cfDef : ksDef.cf_defs)
+ {
+ if (cfDef.name.equalsIgnoreCase(cf))
+ return ThriftConversion.fromThrift(cfDef);
+ }
+ return null;
+ }
+
+ /** get index type from string */
+ protected IndexType getIndexType(String type)
+ {
+ type = type.toLowerCase();
+ if ("keys".equals(type))
+ return IndexType.KEYS;
+ else if("custom".equals(type))
+ return IndexType.CUSTOM;
+ else if("composites".equals(type))
+ return IndexType.COMPOSITES;
+ else
+ return null;
+ }
+
+ /** return partition keys */
+ public String[] getPartitionKeys(String location, Job job) throws IOException
+ {
+ if (!usePartitionFilter)
+ return null;
+ List<ColumnDef> indexes = getIndexes();
+ String[] partitionKeys = new String[indexes.size()];
+ for (int i = 0; i < indexes.size(); i++)
+ {
+ partitionKeys[i] = new String(indexes.get(i).getName());
+ }
+ return partitionKeys;
+ }
+
/** convert key to a tuple */
private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
{
@@ -744,15 +1125,26 @@ public class CassandraStorage extends AbstractCassandraStorage
{
if( comparator instanceof AbstractCompositeType )
{
- setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
+ StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key));
}
else
{
- setTupleValue(tuple, 0, cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key));
+ StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion));
}
}
+ /** Deconstructs a composite type to a Tuple. */
+ protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
+ {
+ List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name);
+ Tuple t = TupleFactory.getInstance().newTuple(result.size());
+ for (int i=0; i<result.size(); i++)
+ StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion));
+
+ return t;
+ }
+
/** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
* [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
* [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
@@ -817,10 +1209,206 @@ public class CassandraStorage extends AbstractCassandraStorage
"[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
}
}
-
+
+
+ /** decompose the query to store the parameters in a map */
+ public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>(params.length);
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+ }
+ return map;
+ }
+
public ByteBuffer nullToBB()
{
return null;
}
-}
+ /** return the CfInfo for the column family */
+ protected CfDef getCfDef(Cassandra.Client client)
+ throws org.apache.cassandra.thrift.InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ NotFoundException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException,
+ IOException
+ {
+ // get CF meta data
+ String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ CfDef cfDef = new CfDef();
+ cfDef.keyspace = keyspace;
+ cfDef.name = column_family;
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+
+ cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+ cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
+ ByteBuffer subComparator = cqlRow.columns.get(2).value;
+ if (subComparator != null)
+ cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+ cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
+ cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ }
+ cfDef.column_metadata = getColumnMetadata(client);
+ return cfDef;
+ }
+
+ /** get the columnfamily definition for the signature */
+ protected CfDef getCfDef(String signature) throws IOException
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ String prop = property.getProperty(signature);
+ return cfdefFromString(prop);
+ }
+
+ /** convert string back to CfDef */
+ protected static CfDef cfdefFromString(String st) throws IOException
+ {
+ assert st != null;
+ TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+ CfDef cfDef = new CfDef();
+ try
+ {
+ deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+ }
+ catch (TException e)
+ {
+ throw new IOException(e);
+ }
+ return cfDef;
+ }
+
+ /** convert CfDef to string */
+ protected static String cfdefToString(CfDef cfDef) throws IOException
+ {
+ assert cfDef != null;
+ // this is so awful it's kind of cool!
+ TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+ try
+ {
+ return Hex.bytesToHex(serializer.serialize(cfDef));
+ }
+ catch (TException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /** parse the string to a cassandra data type */
+ protected AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /** convert a column to a tuple */
+ protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple pair = TupleFactory.getInstance().newTuple(2);
+
+ ByteBuffer colName = col.name().toByteBuffer();
+
+ // name
+ if(comparator instanceof AbstractCompositeType)
+ StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName));
+ else
+ StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+
+ // value
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(colName) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion));
+ }
+ else
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion));
+ return pair;
+ }
+
+ /** construct a map to store the mashaller type to cassandra data type mapping */
+ protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+ {
+ Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
+
+ comparator = parseType(cfDef.getComparator_type());
+ subcomparator = parseType(cfDef.getSubcomparator_type());
+ default_validator = parseType(cfDef.getDefault_validation_class());
+ key_validator = parseType(cfDef.getKey_validation_class());
+
+ marshallers.put(MarshallerType.COMPARATOR, comparator);
+ marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
+ marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
+ marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
+ return marshallers;
+ }
+
+ /** get the validators */
+ protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+ {
+ Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+ for (ColumnDef cd : cfDef.getColumn_metadata())
+ {
+ if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+ {
+ AbstractType validator = null;
+ try
+ {
+ validator = TypeParser.parse(cd.getValidation_class());
+ if (validator instanceof CounterColumnType)
+ validator = LongType.instance;
+ validators.put(cd.name, validator);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+ return validators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..91cdd02 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -17,48 +17,78 @@
*/
package org.apache.cassandra.hadoop.pig;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.*;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.apache.cassandra.db.BufferCell;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.utils.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.*;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
-import com.datastax.driver.core.Row;
-public class CqlNativeStorage extends AbstractCassandraStorage
+public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
+ protected String DEFAULT_INPUT_FORMAT;
+ protected String DEFAULT_OUTPUT_FORMAT;
+
+ protected String username;
+ protected String password;
+ protected String keyspace;
+ protected String column_family;
+ protected String loadSignature;
+ protected String storeSignature;
+
+ protected Configuration conf;
+ protected String inputFormatClass;
+ protected String outputFormatClass;
+ protected int splitSize = 64 * 1024;
+ protected String partitionerClass;
+ protected boolean usePartitionFilter = false;
+ protected String initHostAddress;
+ protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
+
private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
private int pageSize = 1000;
private String columns;
private String outputQuery;
private String whereClause;
- private boolean hasCompactValueAlias = false;
private RecordReader<Long, Row> reader;
private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
@@ -119,21 +149,20 @@ public class CqlNativeStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ TableInfo tableMetadata = getCfInfo(loadSignature);
Row row = reader.getCurrentValue();
- Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
- Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+ Tuple tuple = TupleFactory.getInstance().newTuple(tableMetadata.getColumns().size());
+ Iterator<ColumnInfo> itera = tableMetadata.getColumns().iterator();
int i = 0;
while (itera.hasNext())
{
- ColumnDef cdef = itera.next();
- ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+ ColumnInfo cdef = itera.next();
+ ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
if (columnValue != null)
{
- Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
- AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
- setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
+ Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue);
+ AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
+ setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator);
}
else
tuple.set(i, null);
@@ -148,15 +177,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage
}
/** convert a cql column to an object */
- private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+ private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
{
// standard
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
ByteBuffer cellName = col.name().toByteBuffer();
- if (validators.get(cellName) == null)
- return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
- else
- return cassandraToObj(validators.get(cellName), col.value());
+ return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion);
}
/** set the value to the position of the tuple */
@@ -165,7 +191,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage
if (validator instanceof CollectionType)
setCollectionTupleValues(tuple, position, value, validator);
else
- setTupleValue(tuple, position, value);
+ StorageHelper.setTupleValue(tuple, position, value);
}
/** set the values of set/list at and after the position of the tuple */
@@ -220,173 +246,33 @@ public class CqlNativeStorage extends AbstractCassandraStorage
return obj;
}
- /** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
- {
- List<ColumnDef> keyColumns = null;
- // get key columns
+ /** get the columnfamily definition for the signature */
+ protected TableInfo getCfInfo(String signature) throws IOException
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ TableInfo cfInfo;
try
{
- keyColumns = getKeysMeta(client);
+ cfInfo = cfdefFromString(property.getProperty(signature));
}
- catch(Exception e)
+ catch (ClassNotFoundException e)
{
- logger.error("Error in retrieving key columns" , e);
+ throw new IOException(e);
}
-
- // get other columns
- List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
-
- // combine all columns in a list
- if (keyColumns != null && columns != null)
- keyColumns.addAll(columns);
-
- return keyColumns;
+ return cfInfo;
}
- /** get keys meta data */
- private List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
+ /** return the CfInfo for the column family */
+ protected TableMetadata getCfInfo(Session client)
+ throws NoHostAvailableException,
+ AuthenticationException,
+ IllegalStateException
{
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
- for (ColumnDefinition def : cfm.partitionKeyColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnDefinition def : cfm.clusteringColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- hasCompactValueAlias = true;
- }
- catch (Exception e)
- {
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
+ // get CF meta data
+ return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
}
-
/** output: (((name, value), (name, value)), (value ... value), (value...value)) */
public void putNext(Tuple t) throws IOException
{
@@ -441,6 +327,91 @@ public class CqlNativeStorage extends AbstractCassandraStorage
return keys;
}
+ /** convert object to ByteBuffer */
+ protected ByteBuffer objToBB(Object o)
+ {
+ if (o == null)
+ return nullToBB();
+ if (o instanceof java.lang.String)
+ return ByteBuffer.wrap(new DataByteArray((String)o).get());
+ if (o instanceof Integer)
+ return Int32Type.instance.decompose((Integer)o);
+ if (o instanceof Long)
+ return LongType.instance.decompose((Long)o);
+ if (o instanceof Float)
+ return FloatType.instance.decompose((Float)o);
+ if (o instanceof Double)
+ return DoubleType.instance.decompose((Double)o);
+ if (o instanceof UUID)
+ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+ if(o instanceof Tuple) {
+ List<Object> objects = ((Tuple)o).getAll();
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
+ {
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
+
+ }
+ return objToCompositeBB(objects);
+ }
+
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ }
+ return CollectionSerializer.pack(serialized, objects.size(), 3);
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
+ {
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
+ }
+ return CollectionSerializer.pack(serialized, objects.size(), 3);
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
/** send CQL query request using data from tuple */
private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
{
@@ -487,30 +458,50 @@ public class CqlNativeStorage extends AbstractCassandraStorage
}
}
+ /** get the validators */
+ protected Map<ByteBuffer, AbstractType> getValidatorMap(TableInfo cfDef) throws IOException
+ {
+ Map<ByteBuffer, AbstractType> validators = new HashMap<>();
+ for (ColumnInfo cd : cfDef.getColumns())
+ {
+ if (cd.getTypeName() != null)
+ {
+ try
+ {
+ AbstractType validator = TypeParser.parseCqlName(cd.getTypeName());
+ if (validator instanceof CounterColumnType)
+ validator = LongType.instance;
+ validators.put(ByteBufferUtil.bytes(cd.getName()), validator);
+ }
+ catch (ConfigurationException | SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+ return validators;
+ }
+
/** schema: (value, value, value) where keys are in the front. */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ TableInfo cfInfo = getCfInfo(loadSignature);
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
- // get default marshallers and validators
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+ // get default validators
+ Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfInfo);
// will contain all fields for this schema
List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
- for (ColumnDef cdef : cfDef.column_metadata)
+ for (ColumnInfo cdef : cfInfo.getColumns())
{
ResourceFieldSchema valSchema = new ResourceFieldSchema();
- AbstractType validator = validators.get(cdef.name);
- if (validator == null)
- validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+ AbstractType validator = validators.get(cdef.getName());
valSchema.setName(new String(cdef.getName()));
- valSchema.setType(getPigType(validator));
+ valSchema.setType(StorageHelper.getPigType(validator));
allSchemaFields.add(valSchema);
}
@@ -522,8 +513,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage
public void setPartitionFilter(Expression partitionFilter) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
}
/**
@@ -557,8 +548,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage
private String getWhereClauseForPartitionFilter()
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- return property.getProperty(PARTITION_FILTER_SIGNATURE);
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
}
/** set read configuration settings */
@@ -631,7 +622,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage
CqlConfigHelper.setInputWhereClauses(conf, whereClause);
String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
- String wc = whereClause != null && !whereClause.trim().isEmpty()
+ String wc = whereClause != null && !whereClause.trim().isEmpty()
? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
: whereClauseForPartitionFilter;
@@ -639,17 +630,17 @@ public class CqlNativeStorage extends AbstractCassandraStorage
{
logger.debug("where clause: {}", wc);
CqlConfigHelper.setInputWhereClauses(conf, wc);
- }
- if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ }
+ if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
{
try
{
- ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
}
catch (NumberFormatException e)
{
throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
- }
+ }
}
if (ConfigHelper.getInputInitialAddress(conf) == null)
@@ -700,6 +691,74 @@ public class CqlNativeStorage extends AbstractCassandraStorage
initSchema(storeSignature);
}
+ /** Methods to get the column family schema from Cassandra */
+ protected void initSchema(String signature) throws IOException
+ {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(CqlNativeStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!properties.containsKey(signature))
+ {
+ try
+ {
+ Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect();
+ client.execute("USE " + keyspace);
+
+ // compose the CfDef for the columfamily
+ TableMetadata cfInfo = getCfInfo(client);
+
+ if (cfInfo != null)
+ {
+ properties.setProperty(signature, cfdefToString(cfInfo));
+ }
+ else
+ throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
+ column_family,
+ keyspace));
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+
+ /** convert CfDef to string */
+ protected static String cfdefToString(TableMetadata cfDef) throws IOException
+ {
+ TableInfo tableInfo = new TableInfo(cfDef);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream( baos );
+ oos.writeObject( tableInfo );
+ oos.close();
+ return new String( Base64Coder.encode(baos.toByteArray()) );
+ }
+
+ /** convert string back to CfDef */
+ protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException
+ {
+ byte [] data = Base64Coder.decode( st );
+ ObjectInputStream ois = new ObjectInputStream(
+ new ByteArrayInputStream( data ) );
+ Object o = ois.readObject();
+ ois.close();
+ return (TableInfo)o;
+ }
+
+ /** decompose the query to store the parameters in a map */
+ public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>(params.length);
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+ }
+ return map;
+ }
+
private void setLocationFromUri(String location) throws IOException
{
try
@@ -808,11 +867,171 @@ public class CqlNativeStorage extends AbstractCassandraStorage
}
}
- /**
- * Thrift API can't handle null, so use empty byte array
- */
public ByteBuffer nullToBB()
{
return ByteBuffer.wrap(new byte[0]);
}
+
+ /** output format */
+ public OutputFormat getOutputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+ /** return partition keys */
+ public String[] getPartitionKeys(String location, Job job) throws IOException
+ {
+ if (!usePartitionFilter)
+ return null;
+ TableInfo tableMetadata = getCfInfo(loadSignature);
+ String[] partitionKeys = new String[tableMetadata.getPartitionKey().size()];
+ for (int i = 0; i < tableMetadata.getPartitionKey().size(); i++)
+ {
+ partitionKeys[i] = new String(tableMetadata.getPartitionKey().get(i).getName());
+ }
+ return partitionKeys;
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+ {
+ return location;
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature)
+ {
+ this.loadSignature = signature;
+ }
+
+ /** StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ this.storeSignature = signature;
+ }
+
+ /** set hadoop cassandra connection settings */
+ protected void setConnectionInformation() throws IOException
+ {
+ StorageHelper.setConnectionInformation(conf);
+ if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+ }
+
+ /** get the full class name */
+ protected String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+ }
+}
+
+class TableInfo implements Serializable
+{
+ private final List<ColumnInfo> columns;
+ private final List<ColumnInfo> partitionKey;
+ private final String name;
+
+ public TableInfo(TableMetadata tableMetadata)
+ {
+ List<ColumnMetadata> cmColumns = tableMetadata.getColumns();
+ columns = new ArrayList<>(cmColumns.size());
+ for (ColumnMetadata cm : cmColumns)
+ {
+ columns.add(new ColumnInfo(this, cm));
+ }
+ List<ColumnMetadata> cmPartitionKey = tableMetadata.getPartitionKey();
+ partitionKey = new ArrayList<>(cmPartitionKey.size());
+ for (ColumnMetadata cm : cmPartitionKey)
+ {
+ partitionKey.add(new ColumnInfo(this, cm));
+ }
+ name = tableMetadata.getName();
+ }
+
+ public List<ColumnInfo> getPartitionKey()
+ {
+ return partitionKey;
+ }
+
+ public List<ColumnInfo> getColumns()
+ {
+ return columns;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
}
+
+class ColumnInfo implements Serializable
+{
+ private final TableInfo table;
+ private final String name;
+ private final String typeName;
+
+ public ColumnInfo(TableInfo tableInfo, ColumnMetadata columnMetadata)
+ {
+ table = tableInfo;
+ name = columnMetadata.getName();
+ typeName = columnMetadata.getType().toString();
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getTypeName()
+ {
+ return typeName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
new file mode 100644
index 0000000..66836b2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
@@ -0,0 +1,121 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class StorageHelper
+{
+ // system environment variables that can be set to configure connection info:
+ // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+ public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+ public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+ public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+ public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+ public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+ public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
+ public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+ public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+ public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+ public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+ public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+ public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
+
+
+ public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
+
+ protected static void setConnectionInformation(Configuration conf)
+ {
+ if (System.getenv(PIG_RPC_PORT) != null)
+ {
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ }
+
+ if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+ if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
+ if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+ {
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ }
+ if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+ if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+ if (System.getenv(PIG_PARTITIONER) != null)
+ {
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ }
+ if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+ if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+ }
+
+ protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion)
+ {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ return validator.getString(value);
+
+ if (validator instanceof CollectionType)
+ {
+ // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
+ // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
+ return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
+ }
+
+ return validator.compose(value);
+ }
+
+ /** set the value to the position of the tuple */
+ protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+ {
+ if (value instanceof BigInteger)
+ pair.set(position, ((BigInteger) value).intValue());
+ else if (value instanceof ByteBuffer)
+ pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+ else if (value instanceof UUID)
+ pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+ else if (value instanceof Date)
+ pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
+ else
+ pair.set(position, value);
+ }
+
+ /** get pig type for the cassandra data type*/
+ protected static byte getPigType(AbstractType type)
+ {
+ if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
+ return DataType.LONG;
+ else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
+ return DataType.INTEGER;
+ else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
+ return DataType.CHARARRAY;
+ else if (type instanceof FloatType)
+ return DataType.FLOAT;
+ else if (type instanceof DoubleType)
+ return DataType.DOUBLE;
+ else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
+ return DataType.TUPLE;
+
+ return DataType.BYTEARRAY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 06d83dd..6991958 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -92,7 +92,7 @@ public class SSTableLoader implements StreamEventHandler
return false;
}
- CFMetaData metadata = client.getCFMetaData(keyspace, desc.cfname);
+ CFMetaData metadata = client.getTableMetadata(desc.cfname);
if (metadata == null)
{
outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname));
@@ -251,7 +251,9 @@ public class SSTableLoader implements StreamEventHandler
/**
* Stop the client.
*/
- public void stop() {}
+ public void stop()
+ {
+ }
/**
* Provides connection factory.
@@ -268,7 +270,12 @@ public class SSTableLoader implements StreamEventHandler
* Validate that {@code keyspace} is an existing keyspace and {@code
* cfName} one of its existing column family.
*/
- public abstract CFMetaData getCFMetaData(String keyspace, String cfName);
+ public abstract CFMetaData getTableMetadata(String tableName);
+
+ public void setTableMetadata(CFMetaData cfm)
+ {
+ throw new RuntimeException();
+ }
public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d6ce46e..c17d2d7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4117,8 +4117,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
SSTableLoader.Client client = new SSTableLoader.Client()
{
+ private String keyspace;
+
public void init(String keyspace)
{
+ this.keyspace = keyspace;
try
{
setPartitioner(DatabaseDescriptor.getPartitioner());
@@ -4135,14 +4138,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ public CFMetaData getTableMetadata(String tableName)
{
- return Schema.instance.getCFMetaData(keyspace, cfName);
+ return Schema.instance.getCFMetaData(keyspace, tableName);
}
};
- SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput());
- return loader.stream();
+ return new SSTableLoader(dir, client, new OutputHandler.LogOutput()).stream();
}
public void rescheduleFailedDeletions()