You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/29 21:43:11 UTC
[6/12] git commit: Merge from 1.0
Merge from 1.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/470873fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/470873fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/470873fc
Branch: refs/heads/cassandra-1.1.0
Commit: 470873fcb5c680b880fbe27fb20fa0151d4f1b61
Parents: 0447837 4311692
Author: Brandon Williams <br...@apache.org>
Authored: Wed Feb 29 14:41:29 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Feb 29 14:41:29 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
examples/pig/test/populate-cli.txt | 20 +++++++++
examples/pig/test/test_storage.pig | 21 ++++++++++
.../db/marshal/AbstractCompositeType.java | 32 +++++++++++++++
.../cassandra/hadoop/pig/CassandraStorage.java | 31 ++++++++++++++-
5 files changed, 105 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/470873fc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8350252,d23ff9c..76bb8cb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -26,89 -3,10 +26,91 @@@ Merged from 1.0
* delete hints from dropped ColumnFamilies on handoff instead of
erroring out (CASSANDRA-3975)
* add CompositeType ref to the CLI doc for create/update column family (CASSANDRA-3980)
+ * Pig: support Counter ColumnFamilies (CASSANDRA-3973)
+ * Pig: Composite column support (CASSANDRA-384)
+1.1-beta1
+ * add nodetool rebuild_index (CASSANDRA-3583)
+ * add nodetool rangekeysample (CASSANDRA-2917)
+ * Fix streaming too much data during move operations (CASSANDRA-3639)
+ * Nodetool and CLI connect to localhost by default (CASSANDRA-3568)
+ * Reduce memory used by primary index sample (CASSANDRA-3743)
+ * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765)
+ * avoid returning internal Cassandra classes over JMX (CASSANDRA-2805)
+ * add row-level isolation via SnapTree (CASSANDRA-2893)
+ * Optimize key count estimation when opening sstable on startup
+ (CASSANDRA-2988)
+ * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577)
+ * add command to stop compactions (CASSANDRA-1740, 3566, 3582)
+ * multithreaded streaming (CASSANDRA-3494)
+ * removed in-tree redhat spec (CASSANDRA-3567)
+ * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)
+ * Recycle commitlog segments for improved performance
+ (CASSANDRA-3411, 3543, 3557, 3615)
+ * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
+ * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
+ * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
+ * EACH_QUORUM is only supported for writes (CASSANDRA-3272)
+ * replace compactionlock use in schema migration by checking CFS.isValid
+ (CASSANDRA-3116)
+ * recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445)
+ * Use faster bytes comparison (CASSANDRA-3434)
+ * Bulk loader is no longer a fat client, (HADOOP) bulk load output format
+ (CASSANDRA-3045)
+ * (Hadoop) add support for KeyRange.filter
+ * remove assumption that keys and token are in bijection
+ (CASSANDRA-1034, 3574, 3604)
+ * always remove endpoints from delevery queue in HH (CASSANDRA-3546)
+ * fix race between cf flush and its 2ndary indexes flush (CASSANDRA-3547)
+ * fix potential race in AES when a repair fails (CASSANDRA-3548)
+ * Remove columns shadowed by a deleted container even when we cannot purge
+ (CASSANDRA-3538)
+ * Improve memtable slice iteration performance (CASSANDRA-3545)
+ * more efficient allocation of small bloom filters (CASSANDRA-3618)
+ * Use separate writer thread in SSTableSimpleUnsortedWriter (CASSANDRA-3619)
+ * fsync the directory after new sstable or commitlog segment are created (CASSANDRA-3250)
+ * fix minor issues reported by FindBugs (CASSANDRA-3658)
+ * global key/row caches (CASSANDRA-3143, 3849)
+ * optimize memtable iteration during range scan (CASSANDRA-3638)
+ * introduce 'crc_check_chance' in CompressionParameters to support
+ a checksum percentage checking chance similarly to read-repair (CASSANDRA-3611)
+ * a way to deactivate global key/row cache on per-CF basis (CASSANDRA-3667)
+ * fix LeveledCompactionStrategy broken because of generation pre-allocation
+ in LeveledManifest (CASSANDRA-3691)
+ * finer-grained control over data directories (CASSANDRA-2749)
+ * Fix ClassCastException during hinted handoff (CASSANDRA-3694)
+ * Upgrade Thrift to 0.7 (CASSANDRA-3213)
+ * Make stress.java insert operation to use microseconds (CASSANDRA-3725)
+ * Allows (internally) doing a range query with a limit of columns instead of
+ rows (CASSANDRA-3742)
+ * Allow rangeSlice queries to be start/end inclusive/exclusive (CASSANDRA-3749)
+ * Fix BulkLoader to support new SSTable layout and add stream
+ throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752)
+ * Allow concurrent schema migrations (CASSANDRA-1391, 3832)
+ * Add SnapshotCommand to trigger snapshot on remote node (CASSANDRA-3721)
+ * Make CFMetaData conversions to/from thrift/native schema inverses
+ (CASSANDRA_3559)
+ * Add initial code for CQL 3.0-beta (CASSANDRA-3781, 3753)
+ * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264)
+ * Allow extending CompositeType comparator (CASSANDRA-3657)
+ * Avoids over-paging during get_count (CASSANDRA-3798)
+ * Add new command to rebuild a node without (repair) merkle tree calculations
+ (CASSANDRA-3483, 3922)
+ * respect not only row cache capacity but caching mode when
+ trying to read data (CASSANDRA-3812)
+ * fix system tests (CASSANDRA-3827)
+ * CQL support for altering key_validation_class in ALTER TABLE (CASSANDRA-3781)
+ * turn compression on by default (CASSANDRA-3871)
+ * make hexToBytes refuse invalid input (CASSANDRA-2851)
+ * Make secondary indexes CF inherit compression and compaction from their
+ parent CF (CASSANDRA-3877)
+ * Finish cleanup up tombstone purge code (CASSANDRA-3872)
+ * Avoid NPE on aboarted stream-out sessions (CASSANDRA-3904)
+ * BulkRecordWriter throws NPE for counter columns (CASSANDRA-3906)
+ * Support compression using BulkWriter (CASSANDRA-3907)
+
+
1.0.8
* fix race between cleanup and flush on secondary index CFSes (CASSANDRA-3712)
* avoid including non-queried nodes in rangeslice read repair
http://git-wip-us.apache.org/repos/asf/cassandra/blob/470873fc/examples/pig/test/populate-cli.txt
----------------------------------------------------------------------
diff --cc examples/pig/test/populate-cli.txt
index 0164afe,0000000..c8124dd
mode 100644,000000..100644
--- a/examples/pig/test/populate-cli.txt
+++ b/examples/pig/test/populate-cli.txt
@@@ -1,88 -1,0 +1,108 @@@
+create keyspace PigTest with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options={replication_factor:1};
+use PigTest;
+create column family SomeApp with
+key_validation_class = UTF8Type and
+default_validation_class = LexicalUUIDType and
+comparator = UTF8Type and
+column_metadata =
+[
+ {column_name: name, validation_class: UTF8Type, index_type: KEYS},
+ {column_name: vote_type, validation_class: UTF8Type},
+ {column_name: rating, validation_class: IntegerType},
+ {column_name: score, validation_class: LongType},
+ {column_name: percent, validation_class: FloatType},
+ {column_name: atomic_weight, validation_class: DoubleType},
+];
+
+create column family CopyOfSomeApp with
+key_validation_class = UTF8Type and
+default_validation_class = LexicalUUIDType and
+comparator = UTF8Type and
+column_metadata =
+[
+ {column_name: name, validation_class: UTF8Type, index_type: KEYS},
+ {column_name: vote_type, validation_class: UTF8Type},
+ {column_name: rating, validation_class: IntegerType},
+ {column_name: score, validation_class: LongType},
+ {column_name: percent, validation_class: FloatType},
+ {column_name: atomic_weight, validation_class: DoubleType},
+];
+
+set SomeApp['foo']['name'] = 'User Foo';
+set SomeApp['foo']['vote_type'] = 'like';
+set SomeApp['foo']['rating'] = 8;
+set SomeApp['foo']['score'] = 125000;
+set SomeApp['foo']['percent'] = '85.0';
+set SomeApp['foo']['atomic_weight'] = '2.7182818284590451';
+
+set SomeApp['bar']['name'] = 'User Bar';
+set SomeApp['bar']['vote_type'] = 'like';
+set SomeApp['bar']['rating'] = 9;
+set SomeApp['bar']['score'] = 15000;
+set SomeApp['bar']['percent'] = '35.0';
+set SomeApp['bar']['atomic_weight'] = '3.1415926535897931';
+
+set SomeApp['baz']['name'] = 'User Baz';
+set SomeApp['baz']['vote_type'] = 'dislike';
+set SomeApp['baz']['rating'] = 3;
+set SomeApp['baz']['score'] = 512000;
+set SomeApp['baz']['percent'] = '95.3';
+set SomeApp['baz']['atomic_weight'] = '1.61803399';
+set SomeApp['baz']['extra1'] = lexicaluuid();
+set SomeApp['baz']['extra2'] = lexicaluuid();
+set SomeApp['baz']['extra3'] = lexicaluuid();
+
+set SomeApp['qux']['name'] = 'User Qux';
+set SomeApp['qux']['vote_type'] = 'dislike';
+set SomeApp['qux']['rating'] = 2;
+set SomeApp['qux']['score'] = 12000;
+set SomeApp['qux']['percent'] = '64.7';
+set SomeApp['qux']['atomic_weight'] = '0.660161815846869';
+set SomeApp['qux']['extra1'] = lexicaluuid();
+set SomeApp['qux']['extra2'] = lexicaluuid();
+set SomeApp['qux']['extra3'] = lexicaluuid();
+set SomeApp['qux']['extra4'] = lexicaluuid();
+set SomeApp['qux']['extra5'] = lexicaluuid();
+set SomeApp['qux']['extra6'] = lexicaluuid();
+set SomeApp['qux']['extra7'] = lexicaluuid();
+
+create column family U8 with
+ key_validation_class = UTF8Type and
+ comparator = UTF8Type;
+
+create column family Bytes with
+ key_validation_class = BytesType and
+ comparator = UTF8Type;
+
+set U8['foo']['x'] = ascii('Z');
+set Bytes[ascii('foo')]['x'] = ascii('Z');
+
+create column family CC with
+ key_validation_class = UTF8Type and
+ default_validation_class=CounterColumnType
+ and comparator=UTF8Type;
+
+incr CC['chuck']['kick'];
+incr CC['chuck']['kick'];
+incr CC['chuck']['kick'];
+incr CC['chuck']['fist'];
++
++create column family Compo
++ with key_validation_class = UTF8Type
++ and default_validation_class = UTF8Type
++ and comparator = 'CompositeType(UTF8Type,UTF8Type)';
++
++set Compo['punch']['bruce:lee'] = 'ouch';
++set Compo['punch']['bruce:bruce'] = 'hunh?';
++set Compo['kick']['bruce:lee'] = 'oww';
++set Compo['kick']['bruce:bruce'] = 'watch it, mate';
++
++create column family CompoInt
++ with key_validation_class = UTF8Type
++ and default_validation_class = UTF8Type
++ and comparator = 'CompositeType(LongType,LongType)';
++
++set CompoInt['clock']['1:0'] = 'z';
++set CompoInt['clock']['1:30'] = 'zzzz';
++set CompoInt['clock']['2:30'] = 'daddy?';
++set CompoInt['clock']['6:30'] = 'coffee...';
http://git-wip-us.apache.org/repos/asf/cassandra/blob/470873fc/examples/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --cc examples/pig/test/test_storage.pig
index c49d4b3,0000000..a0157f7
mode 100644,000000..100644
--- a/examples/pig/test/test_storage.pig
+++ b/examples/pig/test/test_storage.pig
@@@ -1,49 -1,0 +1,70 @@@
+rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage();
+-- full copy
+STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+-- single tuple
+onecol = FOREACH rows GENERATE key, percent;
+STORE onecol INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+-- bag only
+other = FOREACH rows GENERATE key, columns;
+STORE other INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+
+
+-- filter
+likes = FILTER rows by vote_type.value eq 'like' and rating.value > 5;
+dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0;
+
+-- store these too
+STORE likes INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+STORE dislikes_extras INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+
+-- filter to fully visible rows (no uuid columns) and dump
+visible = FILTER rows BY COUNT(columns) == 0;
+dump visible;
+
+
+
+-- test key types with a join
+U8 = load 'cassandra://PigTest/U8' using CassandraStorage();
+Bytes = load 'cassandra://PigTest/Bytes' using CassandraStorage();
+
+-- cast key to chararray
+b = foreach Bytes generate (chararray)key, columns;
+
+-- key in Bytes is a bytearray, U8 chararray
+a = join Bytes by key, U8 by key;
+dump a
+
+-- key should now be cast into a chararray
+c = join b by (chararray)key, U8 by (chararray)key;
+dump c
+
+
+--
+-- Test counter column family support
+--
+CC = load 'cassandra://PigTest/CC' using CassandraStorage();
+
+total_hits = foreach CC generate key, SUM(columns.value);
+
+dump total_hits;
++
++--
++-- Test CompositeType
++--
++
++compo = load 'cassandra://PigTest/Compo' using CassandraStorage();
++
++compo = foreach compo generate key as method, flatten(columns);
++
++lee = filter compo by columns::name == ('bruce','lee');
++
++dump lee;
++
++night = load 'cassandra://PigTest/CompoInt' using CassandraStorage();
++night = foreach night generate flatten(columns);
++night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise;
++
++-- What happens at the darkest hour?
++darkest = filter night by hour > 2 and hour < 5;
++
++dump darkest;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/470873fc/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/470873fc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index bb87665,0000000..f10dde5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -1,834 -1,0 +1,863 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ConfigurationException;
++import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.*;
++import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+
+import org.apache.pig.*;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.*;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Cassandra
+ *
+ * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
+ */
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
+{
+ // system environment variables that can be set to configure connection info:
+ // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+ public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+ public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+ public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+ public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+ public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+ public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
+ public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+ public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+ public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+ public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+ public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+ public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
+
+ private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
+ private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
+
+ private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+
+ private ByteBuffer slice_start = BOUND;
+ private ByteBuffer slice_end = BOUND;
+ private boolean slice_reverse = false;
+ private boolean allow_deletes = false;
+ private String keyspace;
+ private String column_family;
+ private String loadSignature;
+ private String storeSignature;
+
+ private Configuration conf;
+ private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader;
+ private RecordWriter<ByteBuffer, List<Mutation>> writer;
+ private String inputFormatClass;
+ private String outputFormatClass;
+ private int limit;
+
+ public CassandraStorage()
+ {
+ this(1024);
+ }
+
+ /**
+ * @param limit number of columns to fetch in a slice
+ */
+ public CassandraStorage(int limit)
+ {
+ super();
+ this.limit = limit;
+ }
+
+ public int getLimit()
+ {
+ return limit;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException
+ {
+ try
+ {
+ // load the next pair
+ if (!reader.nextKeyValue())
+ return null;
+
+ CfDef cfDef = getCfDef(loadSignature);
+ ByteBuffer key = reader.getCurrentKey();
+ Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
+ assert key != null && cf != null;
+
+ // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
+ // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ DefaultDataBag bag = new DefaultDataBag();
+ // set the key
+ setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key));
+ // we must add all the indexed columns first to match the schema
+ Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>();
+ // take care to iterate these in the same order as the schema does
+ for (ColumnDef cdef : cfDef.column_metadata)
+ {
+ if (cf.containsKey(cdef.name))
+ {
+ tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
+ }
+ else
+ { // otherwise, we need to add an empty tuple to take its place
+ tuple.append(TupleFactory.getInstance().newTuple());
+ }
+ added.put(cdef.name, true);
+ }
+ // now add all the other columns
+ for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
+ {
+ if (!added.containsKey(entry.getKey()))
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ }
+ tuple.append(bag);
+ return tuple;
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e.getMessage());
+ }
+ }
+
++ /**
++ * Deconstructs a composite type to a Tuple.
++ */
++ private Tuple composeComposite( AbstractCompositeType comparator, ByteBuffer name ) throws IOException
++ {
++ List<CompositeComponent> result = comparator.deconstruct( name );
++
++ Tuple t = TupleFactory.getInstance().newTuple( result.size() );
++
++ for( int i = 0; i < result.size(); i++ )
++ {
++ setTupleValue( t, i, result.get(i).comparator.compose( result.get(i).value ) );
++ }
++
++ return t;
++ }
++
+ private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple pair = TupleFactory.getInstance().newTuple(2);
+ List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+
- setTupleValue(pair, 0, comparator.compose(col.name()));
++ if( comparator instanceof AbstractCompositeType )
++ {
++ setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
++ }
++ else
++ {
++ setTupleValue(pair, 0, comparator.compose(col.name()));
++ }
+ if (col instanceof Column)
+ {
+ // standard
+ if (validators.get(col.name()) == null)
+ setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
+ else
+ setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
+ return pair;
+ }
+ else
+ {
+ // super
+ ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+ for (IColumn subcol : col.getSubColumns())
+ subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+
+ pair.set(1, new DefaultDataBag(subcols));
+ }
+ return pair;
+ }
+
+ private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+ {
+ if (value instanceof BigInteger)
+ pair.set(position, ((BigInteger) value).intValue());
+ else if (value instanceof ByteBuffer)
+ pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+ else if (value instanceof UUID)
+ pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+ else
+ pair.set(position, value);
+ }
+
+ private CfDef getCfDef(String signature)
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ return cfdefFromString(property.getProperty(signature));
+ }
+
+ private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+ {
+ ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
+
+ comparator = parseType(cfDef.getComparator_type());
+ subcomparator = parseType(cfDef.getSubcomparator_type());
+ default_validator = parseType(cfDef.getDefault_validation_class());
+ key_validator = parseType(cfDef.getKey_validation_class());
+
+ marshallers.add(comparator);
+ marshallers.add(default_validator);
+ marshallers.add(key_validator);
+ marshallers.add(subcomparator);
+ return marshallers;
+ }
+
+ private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+ {
+ Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+ for (ColumnDef cd : cfDef.getColumn_metadata())
+ {
+ if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+ {
+ AbstractType validator = null;
+ try
+ {
+ validator = TypeParser.parse(cd.getValidation_class());
+ validators.put(cd.name, validator);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+ return validators;
+ }
+
+ private AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public InputFormat getInputFormat()
+ {
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ {
+ this.reader = reader;
+ }
+
+ public static Map<String, String> getQueryMap(String query)
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>();
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], keyValue[1]);
+ }
+ return map;
+ }
+
+ private void setLocationFromUri(String location) throws IOException
+ {
+ // parse uri into keyspace and columnfamily
+ String names[];
+ try
+ {
+ if (!location.startsWith("cassandra://"))
+ throw new Exception("Bad scheme.");
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+ AbstractType comparator = BytesType.instance;
+ if (urlQuery.containsKey("comparator"))
+ comparator = TypeParser.parse(urlQuery.get("comparator"));
+ if (urlQuery.containsKey("slice_start"))
+ slice_start = comparator.fromString(urlQuery.get("slice_start"));
+ if (urlQuery.containsKey("slice_end"))
+ slice_end = comparator.fromString(urlQuery.get("slice_end"));
+ if (urlQuery.containsKey("reversed"))
+ slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
+ if (urlQuery.containsKey("limit"))
+ limit = Integer.parseInt(urlQuery.get("limit"));
+ }
+ String[] parts = urlParts[0].split("/+");
+ keyspace = parts[1];
+ column_family = parts[2];
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+ }
+ }
+
+ private void setConnectionInformation() throws IOException
+ {
+ if (System.getenv(PIG_RPC_PORT) != null)
+ {
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ }
+
+ if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+ if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
+ if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+ {
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ }
+ if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+ if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+ if (System.getenv(PIG_PARTITIONER) != null)
+ {
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ }
+ if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+ if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+ if (System.getenv(PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+ if (System.getenv(PIG_ALLOW_DELETES) != null)
+ allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES));
+ }
+
+ private String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
+ if (ConfigHelper.getInputSlicePredicate(conf) == null)
+ {
+ SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
+ SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+ ConfigHelper.setInputSlicePredicate(conf, predicate);
+ }
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+ setConnectionInformation();
+
+ if (ConfigHelper.getInputRpcPort(conf) == 0)
+ throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getInputInitialAddress(conf) == null)
+ throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getInputPartitioner(conf) == null)
+ throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+ if (loadSignature == null)
+ loadSignature = location;
+ initSchema(loadSignature);
+ }
+
+ public ResourceSchema getSchema(String location, Job job) throws IOException
+ {
+ setLocation(location, job);
+ CfDef cfDef = getCfDef(loadSignature);
+
+ if (cfDef.column_type.equals("Super"))
+ return null;
+ /*
+ Our returned schema should look like this:
+ (key, index1:(name, value), index2:(name, value), columns:{(name, value)})
+ Which is to say, columns that have metadata will be returned as named tuples, but unknown columns will go into a bag.
+ This way, wide rows can still be handled by the bag, but known columns can easily be referenced.
+ */
+
+ // top-level schema, no type
+ ResourceSchema schema = new ResourceSchema();
+
+ // get default marshallers and validators
+ List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+
+ // add key
+ ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
+ keyFieldSchema.setName("key");
+ keyFieldSchema.setType(getPigType(marshallers.get(2)));
+
+ ResourceSchema bagSchema = new ResourceSchema();
+ ResourceFieldSchema bagField = new ResourceFieldSchema();
+ bagField.setType(DataType.BAG);
+ bagField.setName("columns");
+ // inside the bag, place one tuple with the default comparator/validator schema
+ ResourceSchema bagTupleSchema = new ResourceSchema();
+ ResourceFieldSchema bagTupleField = new ResourceFieldSchema();
+ bagTupleField.setType(DataType.TUPLE);
+ ResourceFieldSchema bagcolSchema = new ResourceFieldSchema();
+ ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
+ bagcolSchema.setName("name");
+ bagvalSchema.setName("value");
+ bagcolSchema.setType(getPigType(marshallers.get(0)));
+ bagvalSchema.setType(getPigType(marshallers.get(1)));
+ bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
+ bagTupleField.setSchema(bagTupleSchema);
+ bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
+ bagField.setSchema(bagSchema);
+
+ // will contain all fields for this schema
+ List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
+ // add the key first, then the indexed columns, and finally the bag
+ allSchemaFields.add(keyFieldSchema);
+
+ // defined validators/indexes
+ for (ColumnDef cdef : cfDef.column_metadata)
+ {
+ // make a new tuple for each col/val pair
+ ResourceSchema innerTupleSchema = new ResourceSchema();
+ ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
+ innerTupleField.setType(DataType.TUPLE);
+ innerTupleField.setSchema(innerTupleSchema);
+ innerTupleField.setName(new String(cdef.getName()));
+
+ ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
+ idxColSchema.setName("name");
+ idxColSchema.setType(getPigType(marshallers.get(0)));
+
+ ResourceFieldSchema valSchema = new ResourceFieldSchema();
+ AbstractType validator = validators.get(cdef.name);
+ if (validator == null)
+ validator = marshallers.get(1);
+ valSchema.setName("value");
+ valSchema.setType(getPigType(validator));
+
+ innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
+ allSchemaFields.add(innerTupleField);
+ }
+ // bag at the end for unknown columns
+ allSchemaFields.add(bagField);
+
+ // top level schema contains everything
+ schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
+ return schema;
+ }
+
+ private byte getPigType(AbstractType type)
+ {
+ if (type instanceof LongType)
+ return DataType.LONG;
+ else if (type instanceof IntegerType)
+ return DataType.INTEGER;
+ else if (type instanceof AsciiType)
+ return DataType.CHARARRAY;
+ else if (type instanceof UTF8Type)
+ return DataType.CHARARRAY;
+ else if (type instanceof FloatType)
+ return DataType.FLOAT;
+ else if (type instanceof DoubleType)
+ return DataType.DOUBLE;
++ else if (type instanceof AbstractCompositeType )
++ return DataType.TUPLE;
++
+ return DataType.BYTEARRAY;
+ }
+
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ public String[] getPartitionKeys(String location, Job job)
+ {
+ return null;
+ }
+
+ public void setPartitionFilter(Expression partitionFilter)
+ {
+ // no-op
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+ {
+ return location;
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature)
+ {
+ this.loadSignature = signature;
+ }
+
+ /* StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ this.storeSignature = signature;
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ public void setStoreLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+ setConnectionInformation();
+
+ if (ConfigHelper.getOutputRpcPort(conf) == 0)
+ throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+ initSchema(storeSignature);
+ }
+
+ public OutputFormat getOutputFormat()
+ {
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
+ public void prepareToWrite(RecordWriter writer)
+ {
+ this.writer = writer;
+ }
+
+ private ByteBuffer objToBB(Object o)
+ {
+ if (o == null)
+ return (ByteBuffer)o;
+ if (o instanceof java.lang.String)
+ return ByteBuffer.wrap(new DataByteArray((String)o).get());
+ if (o instanceof Integer)
+ return Int32Type.instance.decompose((Integer)o);
+ if (o instanceof Long)
+ return LongType.instance.decompose((Long)o);
+ if (o instanceof Float)
+ return FloatType.instance.decompose((Float)o);
+ if (o instanceof Double)
+ return DoubleType.instance.decompose((Double)o);
+ if (o instanceof UUID)
+ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ public void putNext(Tuple t) throws IOException
+ {
+ /*
+ We support two cases for output:
+ First, the original output:
+ (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
+ For supers, we only accept the original output.
+ */
+
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+ ByteBuffer key = objToBB(t.get(0));
+ if (t.getType(1) == DataType.TUPLE)
+ writeColumnsFromTuple(key, t, 1);
+ else if (t.getType(1) == DataType.BAG)
+ {
+ if (t.size() > 2)
+ throw new IOException("No arguments allowed after bag");
+ writeColumnsFromBag(key, (DefaultDataBag) t.get(1));
+ }
+ else
+ throw new IOException("Second argument in output must be a tuple or bag");
+ }
+
+ private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException
+ {
+ ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
+ for (int i = offset; i < t.size(); i++)
+ {
+ if (t.getType(i) == DataType.BAG)
+ writeColumnsFromBag(key, (DefaultDataBag) t.get(i));
+ else if (t.getType(i) == DataType.TUPLE)
+ {
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() > 0) // may be empty, for an indexed column that wasn't present
+ mutationList.add(mutationFromTuple(inner));
+ }
+ else
+ throw new IOException("Output type was not a bag or a tuple");
+ }
+ if (mutationList.size() > 0)
+ writeMutations(key, mutationList);
+ }
+
+ private Mutation mutationFromTuple(Tuple t) throws IOException
+ {
+ Mutation mutation = new Mutation();
+ if (t.get(1) == null)
+ {
+ if (allow_deletes)
+ {
+ mutation.deletion = new Deletion();
+ mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
+ mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
+ mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+ }
+ else
+ throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable");
+ }
+ else
+ {
+ org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
+ column.setName(objToBB(t.get(0)));
+ column.setValue(objToBB(t.get(1)));
+ column.setTimestamp(FBUtilities.timestampMicros());
+ mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+ mutation.column_or_supercolumn.column = column;
+ }
+ return mutation;
+ }
+
+ private void writeColumnsFromBag(ByteBuffer key, DefaultDataBag bag) throws IOException
+ {
+ List<Mutation> mutationList = new ArrayList<Mutation>();
+ for (Tuple pair : bag)
+ {
+ Mutation mutation = new Mutation();
+ if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
+ {
+ SuperColumn sc = new SuperColumn();
+ sc.setName(objToBB(pair.get(0)));
+ List<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>();
+ for (Tuple subcol : (DefaultDataBag) pair.get(1))
+ {
+ org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
+ column.setName(objToBB(subcol.get(0)));
+ column.setValue(objToBB(subcol.get(1)));
+ column.setTimestamp(FBUtilities.timestampMicros());
+ columns.add(column);
+ }
+ if (columns.isEmpty())
+ {
+ if (allow_deletes)
+ {
+ mutation.deletion = new Deletion();
+ mutation.deletion.super_column = objToBB(pair.get(0));
+ mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+ }
+ else
+ throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable");
+ }
+ else
+ {
+ sc.columns = columns;
+ mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+ mutation.column_or_supercolumn.super_column = sc;
+ }
+ }
+ else
+ mutation = mutationFromTuple(pair);
+ mutationList.add(mutation);
+ // for wide rows, we need to limit the amount of mutations we write at once
+ if (mutationList.size() >= 10) // arbitrary, CFOF will re-batch this up, and BOF won't care
+ {
+ writeMutations(key, mutationList);
+ mutationList.clear();
+ }
+ }
+ // write the last batch
+ if (mutationList.size() > 0)
+ writeMutations(key, mutationList);
+ }
+
+ private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException
+ {
+ try
+ {
+ writer.write(key, mutations);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ /* Methods to get the column family schema from Cassandra */
+
+ private void initSchema(String signature)
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!property.containsKey(signature))
+ {
+ Cassandra.Client client = null;
+ try
+ {
+ client = ConfigHelper.getClientFromInputAddressList(conf);
+ CfDef cfDef = null;
+ client.set_keyspace(keyspace);
+ KsDef ksDef = client.describe_keyspace(keyspace);
+ List<CfDef> defs = ksDef.getCf_defs();
+ for (CfDef def : defs)
+ {
+ if (column_family.equalsIgnoreCase(def.getName()))
+ {
+ cfDef = def;
+ break;
+ }
+ }
+ if (cfDef != null)
+ property.setProperty(signature, cfdefToString(cfDef));
+ else
+ throw new RuntimeException("Column family '" + column_family + "' not found in keyspace '" + keyspace + "'");
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (NotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static String cfdefToString(CfDef cfDef)
+ {
+ assert cfDef != null;
+ // this is so awful it's kind of cool!
+ TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+ try
+ {
+ return Hex.bytesToHex(serializer.serialize(cfDef));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static CfDef cfdefFromString(String st)
+ {
+ assert st != null;
+ TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+ CfDef cfDef = new CfDef();
+ try
+ {
+ deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return cfDef;
+ }
+}
+