You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/06/04 12:06:54 UTC
[1/3] cassandra git commit: Remove use of Cell in Thrift M/R classes
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 16b02887d -> 193665757
refs/heads/trunk 78c03052c -> 9718e13d7
Remove use of Cell in Thrift M/R classes
Patch and review by Philip Thompson and Sam Tunnicliffe for
CASSANDRA-8609
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/19366575
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/19366575
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/19366575
Branch: refs/heads/cassandra-2.2
Commit: 1936657570655129052cc48fa373c155086a6456
Parents: 16b0288
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Jun 2 15:56:36 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Jun 4 10:46:57 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/hadoop_cql3_word_count/README.txt | 14 +-
.../bin/word_count_counters | 1 +
.../conf/log4j.properties | 32 -
.../hadoop_cql3_word_count/conf/logback.xml | 42 +
.../hadoop_cql3_word_count/src/WordCount.java | 4 +-
examples/hadoop_word_count/README.txt | 10 +-
.../hadoop_word_count/bin/word_count_counters | 1 +
.../hadoop_word_count/conf/log4j.properties | 32 -
examples/hadoop_word_count/conf/logback.xml | 42 +
examples/hadoop_word_count/src/WordCount.java | 20 +-
.../src/WordCountCounters.java | 24 +-
.../hadoop/ColumnFamilyInputFormat.java | 7 +-
.../hadoop/ColumnFamilyRecordReader.java | 117 +--
.../hadoop/pig/AbstractCassandraStorage.java | 796 -------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 35 +-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 12 +-
test/conf/logback-test.xml | 4 +-
18 files changed, 218 insertions(+), 976 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 933f5a6..882279f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Remove use of Cell in Thrift MapReduce classes (CASSANDRA-8609)
* Integrate pre-release Java Driver 2.2-rc1, custom build (CASSANDRA-9493)
* Clean up gossiper logic for old versions (CASSANDRA-9370)
* Fix custom payload coding/decoding to match the spec (CASSANDRA-9515)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/README.txt b/examples/hadoop_cql3_word_count/README.txt
index b69bdd5..b6ee33f 100644
--- a/examples/hadoop_cql3_word_count/README.txt
+++ b/examples/hadoop_cql3_word_count/README.txt
@@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows,
and counts them, with RandomPartitioner. The word_count_counters example sums
the value of counter columns for a key.
-The scripts in bin/ assume you are running with cwd of contrib/word_count.
+The scripts in bin/ assume you are running with cwd of examples/word_count.
Running
=======
-First build and start a Cassandra server with the default configuration*,
-then run
+First build and start a Cassandra server with the default configuration*. Ensure that the Thrift
+interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running
+`nodetool enablethrift` after startup.
+Once Cassandra has started and the Thrift interface is available, run
contrib/word_count$ ant
contrib/word_count$ bin/word_count_setup
@@ -22,14 +24,14 @@ contrib/word_count$ bin/word_count_counters
In order to view the results in Cassandra, one can use bin/cqlsh and
perform the following operations:
$ bin/cqlsh localhost
-> use cql3_worldcount;
+> use cql3_wordcount;
> select * from output_words;
The output of the word count can now be configured. In the bin/word_count
file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
directories. The cassandra option outputs to the 'output_words' column family
-in the 'cql3_worldcount' keyspace. 'cassandra' is the default.
+in the 'cql3_wordcount' keyspace. 'cassandra' is the default.
Read the code in src/ for more details.
@@ -45,5 +47,5 @@ settings accordingly.
Troubleshooting
===============
-word_count uses conf/log4j.properties to log to wc.out.
+word_count uses conf/logback.xml to log to wc.out.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
index 0b69b40..52ea2e5 100755
--- a/examples/hadoop_cql3_word_count/bin/word_count_counters
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then
exit 1
fi
+CLASSPATH=$CLASSPATH:$cwd/../conf
CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/log4j.properties b/examples/hadoop_cql3_word_count/conf/log4j.properties
deleted file mode 100644
index 508d60f..0000000
--- a/examples/hadoop_cql3_word_count/conf/log4j.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=INFO,stdout,F
-
-#stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
-
-# log file
-log4j.appender.F=org.apache.log4j.FileAppender
-log4j.appender.F.Append=false
-log4j.appender.F.layout=org.apache.log4j.PatternLayout
-log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
-# Edit the next line to point to your logs directory
-log4j.appender.F.File=wc.out
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/logback.xml b/examples/hadoop_cql3_word_count/conf/logback.xml
new file mode 100644
index 0000000..443bd1c
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/conf/logback.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration scan="true">
+
+ <jmxConfigurator />
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>wc.out</file>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="FILE" />
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 3702a2b..bc95736 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -44,7 +44,7 @@ import com.datastax.driver.core.Row;
/**
* This counts the occurrences of words in ColumnFamily
- * cql3_worldcount ( id uuid,
+ * cql3_wordcount ( id uuid,
* line text,
* PRIMARY KEY (id))
*
@@ -60,7 +60,7 @@ public class WordCount extends Configured implements Tool
{
private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
static final String INPUT_MAPPER_VAR = "input_mapper";
- static final String KEYSPACE = "cql3_worldcount";
+ static final String KEYSPACE = "cql3_wordcount";
static final String COLUMN_FAMILY = "inputs";
static final String OUTPUT_REDUCER_VAR = "output_reducer";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/README.txt b/examples/hadoop_word_count/README.txt
index ec6f512..e336b89 100644
--- a/examples/hadoop_word_count/README.txt
+++ b/examples/hadoop_word_count/README.txt
@@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows,
and counts them, with RandomPartitioner. The word_count_counters example sums
the value of counter columns for a key.
-The scripts in bin/ assume you are running with cwd of contrib/word_count.
+The scripts in bin/ assume you are running with cwd of examples/word_count.
Running
=======
-First build and start a Cassandra server with the default configuration*,
-then run
+First build and start a Cassandra server with the default configuration*. Ensure that the Thrift
+interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running
+`nodetool enablethrift` after startup.
+Once Cassandra has started and the Thrift interface is available, run
contrib/word_count$ ant
contrib/word_count$ bin/word_count_setup
@@ -45,4 +47,4 @@ settings accordingly.
Troubleshooting
===============
-word_count uses conf/log4j.properties to log to wc.out.
+word_count uses conf/logback.xml to log to wc.out.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count_counters b/examples/hadoop_word_count/bin/word_count_counters
index 7793477..58c398c 100755
--- a/examples/hadoop_word_count/bin/word_count_counters
+++ b/examples/hadoop_word_count/bin/word_count_counters
@@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then
exit 1
fi
+CLASSPATH=$CLASSPATH:$cwd/../conf
CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/log4j.properties b/examples/hadoop_word_count/conf/log4j.properties
deleted file mode 100644
index 508d60f..0000000
--- a/examples/hadoop_word_count/conf/log4j.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=INFO,stdout,F
-
-#stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
-
-# log file
-log4j.appender.F=org.apache.log4j.FileAppender
-log4j.appender.F.Append=false
-log4j.appender.F.layout=org.apache.log4j.PatternLayout
-log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
-# Edit the next line to point to your logs directory
-log4j.appender.F.File=wc.out
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/logback.xml b/examples/hadoop_word_count/conf/logback.xml
new file mode 100644
index 0000000..443bd1c
--- /dev/null
+++ b/examples/hadoop_word_count/conf/logback.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration scan="true">
+
+ <jmxConfigurator />
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>wc.out</file>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="FILE" />
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java
index f6bca77..d092f1f 100644
--- a/examples/hadoop_word_count/src/WordCount.java
+++ b/examples/hadoop_word_count/src/WordCount.java
@@ -20,15 +20,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -71,7 +67,7 @@ public class WordCount extends Configured implements Tool
System.exit(0);
}
- public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, IntWritable>
+ public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@@ -82,17 +78,17 @@ public class WordCount extends Configured implements Tool
{
}
- public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException
+ public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException
{
- for (Cell cell : columns.values())
+ for (ColumnFamilyRecordReader.Column column : columns.values())
{
- String name = ByteBufferUtil.string(cell.name().toByteBuffer());
+ String name = ByteBufferUtil.string(column.name);
String value = null;
if (name.contains("int"))
- value = String.valueOf(ByteBufferUtil.toInt(cell.value()));
+ value = String.valueOf(ByteBufferUtil.toInt(column.value));
else
- value = ByteBufferUtil.string(cell.value());
+ value = ByteBufferUtil.string(column.value);
logger.debug("read {}:{}={} from {}",
new Object[] {ByteBufferUtil.string(key), name, value, context.getInputSplit()});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCountCounters.java b/examples/hadoop_word_count/src/WordCountCounters.java
index 39fb778..98c8579 100644
--- a/examples/hadoop_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_word_count/src/WordCountCounters.java
@@ -20,26 +20,26 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.SortedMap;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.thrift.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
/**
* This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1".
*
@@ -60,15 +60,15 @@ public class WordCountCounters extends Configured implements Tool
System.exit(0);
}
- public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, LongWritable>
+ public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, LongWritable>
{
- public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException
+ public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException
{
long sum = 0;
- for (Cell cell : columns.values())
+ for (ColumnFamilyRecordReader.Column column : columns.values())
{
- logger.debug("read " + key + ":" + cell.name() + " from " + context.getInputSplit());
- sum += ByteBufferUtil.toLong(cell.value());
+ logger.debug("read " + key + ":" + ByteBufferUtil.string(column.name) + " from " + context.getInputSplit());
+ sum += ByteBufferUtil.toLong(column.value);
}
context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index f89825f..4662fa5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.db.Cell;
import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
@@ -57,7 +56,7 @@ import org.apache.thrift.transport.TTransportException;
* The default split size is 64k rows.
*/
@Deprecated
-public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
@@ -91,12 +90,12 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
return client;
}
- public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+ public RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
}
- public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+ public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
{
TaskAttemptContext tac = HadoopCompat.newMapContext(
jobConf,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index c103d75..aee730d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,9 +29,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
@@ -49,8 +46,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
@Deprecated
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
- implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
+ implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
@@ -58,7 +55,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private ColumnFamilySplit split;
private RowIterator iter;
- private Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> currentRow;
+ private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
private SlicePredicate predicate;
private boolean isEmptyPredicate;
private int totalRowCount; // total number of rows to fetch
@@ -98,7 +95,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return currentRow.left;
}
- public SortedMap<ByteBuffer, Cell> getCurrentValue()
+ public SortedMap<ByteBuffer, Column> getCurrentValue()
{
return currentRow.right;
}
@@ -216,7 +213,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return split.getLocations()[0];
}
- private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
+ private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
{
protected List<KeySlice> rows;
protected int totalRead = 0;
@@ -283,50 +280,48 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return totalRead;
}
- protected List<Cell> unthriftify(ColumnOrSuperColumn cosc)
+ protected List<Pair<ByteBuffer, Column>> unthriftify(ColumnOrSuperColumn cosc)
{
if (cosc.counter_column != null)
- return Collections.<Cell>singletonList(unthriftifyCounter(cosc.counter_column));
+ return Collections.singletonList(unthriftifyCounter(cosc.counter_column));
if (cosc.counter_super_column != null)
return unthriftifySuperCounter(cosc.counter_super_column);
if (cosc.super_column != null)
return unthriftifySuper(cosc.super_column);
assert cosc.column != null;
- return Collections.<Cell>singletonList(unthriftifySimple(cosc.column));
+ return Collections.singletonList(unthriftifySimple(cosc.column));
}
- private List<Cell> unthriftifySuper(SuperColumn super_column)
+ private List<Pair<ByteBuffer, Column>> unthriftifySuper(SuperColumn super_column)
{
- List<Cell> cells = new ArrayList<Cell>(super_column.columns.size());
+ List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
for (org.apache.cassandra.thrift.Column column : super_column.columns)
{
- Cell c = unthriftifySimple(column);
- cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer()))));
+ Pair<ByteBuffer, Column> c = unthriftifySimple(column);
+ columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
}
- return cells;
+ return columns;
}
- protected Cell unthriftifySimple(org.apache.cassandra.thrift.Column column)
+ protected Pair<ByteBuffer, Column> unthriftifySimple(org.apache.cassandra.thrift.Column column)
{
- return new BufferCell(CellNames.simpleDense(column.name), column.value, column.timestamp);
+ return Pair.create(column.name, Column.fromRegularColumn(column));
}
- private Cell unthriftifyCounter(CounterColumn column)
+ private Pair<ByteBuffer, Column> unthriftifyCounter(CounterColumn column)
{
- //CounterColumns read the counterID from the System keyspace, so need the StorageService running and access
- //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Cell.
- return new BufferCell(CellNames.simpleDense(column.name), ByteBufferUtil.bytes(column.value), 0);
+ return Pair.create(column.name, Column.fromCounterColumn(column));
}
- private List<Cell> unthriftifySuperCounter(CounterSuperColumn super_column)
+ private List<Pair<ByteBuffer, Column>> unthriftifySuperCounter(CounterSuperColumn super_column)
{
- List<Cell> cells = new ArrayList<Cell>(super_column.columns.size());
+ List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
for (CounterColumn column : super_column.columns)
{
- Cell c = unthriftifyCounter(column);
- cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer()))));
+ Pair<ByteBuffer, Column> c = unthriftifyCounter(column);
+ columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
}
- return cells;
+ return columns;
}
}
@@ -405,7 +400,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
maybeInit();
if (rows == null)
@@ -414,12 +409,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
totalRead++;
KeySlice ks = rows.get(i++);
AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
- SortedMap<ByteBuffer, Cell> map = new TreeMap<ByteBuffer, Cell>(comp);
+ SortedMap<ByteBuffer, Column> map = new TreeMap<>(comp);
for (ColumnOrSuperColumn cosc : ks.columns)
{
- List<Cell> cells = unthriftify(cosc);
- for (Cell cell : cells)
- map.put(cell.name().toByteBuffer(), cell);
+ List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
+ for (Pair<ByteBuffer, Column> column : columns)
+ map.put(column.left, column.right);
}
return Pair.create(ks.key, map);
}
@@ -427,7 +422,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private class WideRowIterator extends RowIterator
{
- private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> wideColumns;
+ private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -476,13 +471,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
maybeInit();
if (rows == null)
return endOfData();
- Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next = wideColumns.next();
+ Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
lastColumn = next.right.keySet().iterator().next().duplicate();
maybeIncreaseRowCounter(next);
@@ -494,7 +489,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
* Increases the row counter only if we really moved to the next row.
* @param next just fetched row slice
*/
- private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next)
+ private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
{
ByteBuffer currentKey = next.left;
if (!currentKey.equals(lastCountedKey))
@@ -504,7 +499,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
+ private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
{
private final Iterator<KeySlice> rows;
private Iterator<ColumnOrSuperColumn> columns;
@@ -525,7 +520,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
columns = currentRow.columns.iterator();
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
while (true)
@@ -533,20 +528,20 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
if (columns.hasNext())
{
ColumnOrSuperColumn cosc = columns.next();
- SortedMap<ByteBuffer, Cell> map;
- List<Cell> cells = unthriftify(cosc);
- if (cells.size() == 1)
+ SortedMap<ByteBuffer, Column> map;
+ List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
+ if (columns.size() == 1)
{
- map = ImmutableSortedMap.of(cells.get(0).name().toByteBuffer(), cells.get(0));
+ map = ImmutableSortedMap.of(columns.get(0).left, columns.get(0).right);
}
else
{
assert isSuper;
- map = new TreeMap<ByteBuffer, Cell>(comp);
- for (Cell cell : cells)
- map.put(cell.name().toByteBuffer(), cell);
+ map = new TreeMap<>(comp);
+ for (Pair<ByteBuffer, Column> column : columns)
+ map.put(column.left, column.right);
}
- return Pair.<ByteBuffer, SortedMap<ByteBuffer, Cell>>create(currentRow.key, map);
+ return Pair.create(currentRow.key, map);
}
if (!rows.hasNext())
@@ -563,7 +558,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
// to the old. Thus, expect a small performance hit.
// And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
// and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Cell> value) throws IOException
+ public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
{
if (this.nextKeyValue())
{
@@ -584,13 +579,37 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return ByteBuffer.wrap(new byte[this.keyBufferSize]);
}
- public SortedMap<ByteBuffer, Cell> createValue()
+ public SortedMap<ByteBuffer, Column> createValue()
{
- return new TreeMap<ByteBuffer, Cell>();
+ return new TreeMap<>();
}
public long getPos() throws IOException
{
return iter.rowsRead();
}
+
+ public static final class Column
+ {
+ public final ByteBuffer name;
+ public final ByteBuffer value;
+ public final long timestamp;
+
+ private Column(ByteBuffer name, ByteBuffer value, long timestamp)
+ {
+ this.name = name;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ static Column fromRegularColumn(org.apache.cassandra.thrift.Column input)
+ {
+ return new Column(input.name, input.value, input.timestamp);
+ }
+
+ static Column fromCounterColumn(org.apache.cassandra.thrift.CounterColumn input)
+ {
+ return new Column(input.name, ByteBufferUtil.bytes(input.value), 0);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
deleted file mode 100644
index 263e6c0..0000000
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.pig;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import org.apache.cassandra.transport.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.pig.*;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Cassandra
- */
-public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
-{
-
- protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
-
- // system environment variables that can be set to configure connection info:
- // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
- public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
- public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
- public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
- public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
- public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
- public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
- public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
- public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
- public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
- public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
- public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
- public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
-
- protected String DEFAULT_INPUT_FORMAT;
- protected String DEFAULT_OUTPUT_FORMAT;
-
- public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class);
-
- protected String username;
- protected String password;
- protected String keyspace;
- protected String column_family;
- protected String loadSignature;
- protected String storeSignature;
-
- protected Configuration conf;
- protected String inputFormatClass;
- protected String outputFormatClass;
- protected int splitSize = 64 * 1024;
- protected String partitionerClass;
- protected boolean usePartitionFilter = false;
- protected String initHostAddress;
- protected String rpcPort;
- protected int nativeProtocolVersion = 1;
-
-
- public AbstractCassandraStorage()
- {
- super();
- }
-
- /** Deconstructs a composite type to a Tuple. */
- protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
- {
- List<CompositeComponent> result = comparator.deconstruct(name);
- Tuple t = TupleFactory.getInstance().newTuple(result.size());
- for (int i=0; i<result.size(); i++)
- setTupleValue(t, i, cassandraToObj(result.get(i).comparator, result.get(i).value));
-
- return t;
- }
-
- /** convert a column to a tuple */
- protected Tuple columnToTuple(Cell col, CfInfo cfInfo, AbstractType comparator) throws IOException
- {
- CfDef cfDef = cfInfo.cfDef;
- Tuple pair = TupleFactory.getInstance().newTuple(2);
-
- ByteBuffer colName = col.name().toByteBuffer();
-
- // name
- if(comparator instanceof AbstractCompositeType)
- setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName));
- else
- setTupleValue(pair, 0, cassandraToObj(comparator, colName));
-
- // value
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
- {
- ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(colName);
- colName = names[names.length-1];
- }
- if (validators.get(colName) == null)
- {
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
- }
- else
- setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
- return pair;
- }
-
- /** set the value to the position of the tuple */
- protected void setTupleValue(Tuple pair, int position, Object value) throws ExecException
- {
- if (value instanceof BigInteger)
- pair.set(position, ((BigInteger) value).intValue());
- else if (value instanceof ByteBuffer)
- pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
- else if (value instanceof UUID)
- pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
- else if (value instanceof Date)
- pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
- else
- pair.set(position, value);
- }
-
- /** get the columnfamily definition for the signature */
- protected CfInfo getCfInfo(String signature) throws IOException
- {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- String prop = property.getProperty(signature);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfdefFromString(prop.substring(2));
- cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
- cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
- return cfInfo;
- }
-
- /** construct a map to store the mashaller type to cassandra data type mapping */
- protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
- {
- Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
- AbstractType comparator;
- AbstractType subcomparator;
- AbstractType default_validator;
- AbstractType key_validator;
-
- comparator = parseType(cfDef.getComparator_type());
- subcomparator = parseType(cfDef.getSubcomparator_type());
- default_validator = parseType(cfDef.getDefault_validation_class());
- key_validator = parseType(cfDef.getKey_validation_class());
-
- marshallers.put(MarshallerType.COMPARATOR, comparator);
- marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
- marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
- marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
- return marshallers;
- }
-
- /** get the validators */
- protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
- {
- Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
- for (ColumnDef cd : cfDef.getColumn_metadata())
- {
- if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
- {
- AbstractType validator = null;
- try
- {
- validator = TypeParser.parse(cd.getValidation_class());
- if (validator instanceof CounterColumnType)
- validator = LongType.instance;
- validators.put(cd.name, validator);
- }
- catch (ConfigurationException | SyntaxException e)
- {
- throw new IOException(e);
- }
- }
- }
- return validators;
- }
-
- /** parse the string to a cassandra data type */
- protected AbstractType parseType(String type) throws IOException
- {
- try
- {
- // always treat counters like longs, specifically CCT.compose is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- @Override
- public InputFormat getInputFormat() throws IOException
- {
- try
- {
- return FBUtilities.construct(inputFormatClass, "inputformat");
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- }
-
- /** decompose the query to store the parameters in a map */
- public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
- {
- String[] params = query.split("&");
- Map<String, String> map = new HashMap<String, String>(params.length);
- for (String param : params)
- {
- String[] keyValue = param.split("=");
- map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8"));
- }
- return map;
- }
-
- /** set hadoop cassandra connection settings */
- protected void setConnectionInformation() throws IOException
- {
- if (System.getenv(PIG_RPC_PORT) != null)
- {
- ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
- ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
- }
-
- if (System.getenv(PIG_INPUT_RPC_PORT) != null)
- ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
- if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
- ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
-
- if (System.getenv(PIG_INITIAL_ADDRESS) != null)
- {
- ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- }
- if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
- ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
- if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
- ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
-
- if (System.getenv(PIG_PARTITIONER) != null)
- {
- ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- }
- if(System.getenv(PIG_INPUT_PARTITIONER) != null)
- ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
- if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
- ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
- if (System.getenv(PIG_INPUT_FORMAT) != null)
- inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
- else
- inputFormatClass = DEFAULT_INPUT_FORMAT;
- if (System.getenv(PIG_OUTPUT_FORMAT) != null)
- outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
- else
- outputFormatClass = DEFAULT_OUTPUT_FORMAT;
- }
-
- /** get the full class name */
- protected String getFullyQualifiedClassName(String classname)
- {
- return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
- }
-
- /** get pig type for the cassandra data type*/
- protected byte getPigType(AbstractType type)
- {
- if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
- return DataType.LONG;
- else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
- return DataType.INTEGER;
- else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
- return DataType.CHARARRAY;
- else if (type instanceof FloatType)
- return DataType.FLOAT;
- else if (type instanceof DoubleType)
- return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
- return DataType.TUPLE;
-
- return DataType.BYTEARRAY;
- }
-
- public ResourceStatistics getStatistics(String location, Job job)
- {
- return null;
- }
-
- @Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException
- {
- return location;
- }
-
- @Override
- public void setUDFContextSignature(String signature)
- {
- this.loadSignature = signature;
- }
-
- /** StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature)
- {
- this.storeSignature = signature;
- }
-
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
- {
- return relativeToAbsolutePath(location, curDir);
- }
-
- /** output format */
- public OutputFormat getOutputFormat() throws IOException
- {
- try
- {
- return FBUtilities.construct(outputFormatClass, "outputformat");
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- }
-
- public void checkSchema(ResourceSchema schema) throws IOException
- {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
- protected abstract ByteBuffer nullToBB();
-
- /** convert object to ByteBuffer */
- protected ByteBuffer objToBB(Object o)
- {
- if (o == null)
- return nullToBB();
- if (o instanceof java.lang.String)
- return ByteBuffer.wrap(new DataByteArray((String)o).get());
- if (o instanceof Integer)
- return Int32Type.instance.decompose((Integer)o);
- if (o instanceof Long)
- return LongType.instance.decompose((Long)o);
- if (o instanceof Float)
- return FloatType.instance.decompose((Float)o);
- if (o instanceof Double)
- return DoubleType.instance.decompose((Double)o);
- if (o instanceof UUID)
- return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
- if(o instanceof Tuple) {
- List<Object> objects = ((Tuple)o).getAll();
- //collections
- if (objects.size() > 0 && objects.get(0) instanceof String)
- {
- String collectionType = (String) objects.get(0);
- if ("set".equalsIgnoreCase(collectionType) ||
- "list".equalsIgnoreCase(collectionType))
- return objToListOrSetBB(objects.subList(1, objects.size()));
- else if ("map".equalsIgnoreCase(collectionType))
- return objToMapBB(objects.subList(1, objects.size()));
-
- }
- return objToCompositeBB(objects);
- }
-
- return ByteBuffer.wrap(((DataByteArray) o).get());
- }
-
- private ByteBuffer objToListOrSetBB(List<Object> objects)
- {
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- for(Object sub : objects)
- {
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- }
- // NOTE: using protocol v1 serialization format for collections so as to not break
- // compatibility. Not sure if that's the right thing.
- return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1);
- }
-
- private ByteBuffer objToMapBB(List<Object> objects)
- {
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
- for(Object sub : objects)
- {
- List<Object> keyValue = ((Tuple)sub).getAll();
- for (Object entry: keyValue)
- {
- ByteBuffer buffer = objToBB(entry);
- serialized.add(buffer);
- }
- }
- // NOTE: using protocol v1 serialization format for collections so as to not break
- // compatibility. Not sure if that's the right thing.
- return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1);
- }
-
- private ByteBuffer objToCompositeBB(List<Object> objects)
- {
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
- {
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
- }
- out.flip();
- return out;
- }
-
- public void cleanupOnFailure(String failure, Job job)
- {
- }
-
- public void cleanupOnSuccess(String location, Job job) throws IOException {
- }
-
-
- /** Methods to get the column family schema from Cassandra */
- protected void initSchema(String signature) throws IOException
- {
- Properties properties = UDFContext.getUDFContext().getUDFProperties(AbstractCassandraStorage.class);
-
- // Only get the schema if we haven't already gotten it
- if (!properties.containsKey(signature))
- {
- try
- {
- Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
- client.set_keyspace(keyspace);
-
- if (username != null && password != null)
- {
- Map<String, String> credentials = new HashMap<String, String>(2);
- credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
- credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
-
- try
- {
- client.login(new AuthenticationRequest(credentials));
- }
- catch (AuthenticationException e)
- {
- logger.error("Authentication exception: invalid username and/or password");
- throw new IOException(e);
- }
- }
-
- // compose the CfDef for the columfamily
- CfInfo cfInfo = getCfInfo(client);
-
- if (cfInfo.cfDef != null)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
- properties.setProperty(signature, sb.toString());
- }
- else
- throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
- column_family,
- keyspace));
- }
- catch (Exception e)
- {
- throw new IOException(e);
- }
- }
- }
-
- /** convert CfDef to string */
- protected static String cfdefToString(CfDef cfDef) throws IOException
- {
- assert cfDef != null;
- // this is so awful it's kind of cool!
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try
- {
- return Hex.bytesToHex(serializer.serialize(cfDef));
- }
- catch (TException e)
- {
- throw new IOException(e);
- }
- }
-
- /** convert string back to CfDef */
- protected static CfDef cfdefFromString(String st) throws IOException
- {
- assert st != null;
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- CfDef cfDef = new CfDef();
- try
- {
- deserializer.deserialize(cfDef, Hex.hexToBytes(st));
- }
- catch (TException e)
- {
- throw new IOException(e);
- }
- return cfDef;
- }
-
- /** return the CfInfo for the column family */
- protected CfInfo getCfInfo(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- NotFoundException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- IOException
- {
- // get CF meta data
- String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNFAMILIES,
- keyspace,
- column_family);
-
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- CfDef cfDef = new CfDef();
- cfDef.keyspace = keyspace;
- cfDef.name = column_family;
- boolean cql3Table = false;
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
-
- cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
- cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
- ByteBuffer subComparator = cqlRow.columns.get(2).value;
- if (subComparator != null)
- cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
- cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
- cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value);
- if (FBUtilities.fromJsonList(keyAliases).size() > 0)
- cql3Table = true;
- }
- cfDef.column_metadata = getColumnMetadata(client);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfDef;
- if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
- cfInfo.compactCqlTable = true;
- if (cql3Table)
- cfInfo.cql3Table = true;;
- return cfInfo;
- }
-
- /** get a list of columns */
- protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException;
-
- /** get column meta data */
- protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
- {
- String query = String.format("SELECT column_name, validator, index_type, type " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNS,
- keyspace,
- column_family);
-
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
- List<CqlRow> rows = result.rows;
- List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
- if (rows == null || rows.isEmpty())
- {
- // if CassandraStorage, just return the empty list
- if (cassandraStorage)
- return columnDefs;
-
- // otherwise for CqlNativeStorage, check metadata for classic thrift tables
- CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
- for (ColumnDefinition def : cfm.regularAndStaticColumns())
- {
- ColumnDef cDef = new ColumnDef();
- String columnName = def.name.toString();
- String type = def.type.toString();
- logger.debug("name: {}, type: {} ", columnName, type);
- cDef.name = ByteBufferUtil.bytes(columnName);
- cDef.validation_class = type;
- columnDefs.add(cDef);
- }
- // we may not need to include the value column for compact tables as we
- // could have already processed it as schema_columnfamilies.value_alias
- if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
- {
- ColumnDefinition def = cfm.compactValueColumn();
- if ("value".equals(def.name.toString()))
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = def.name.bytes;
- cDef.validation_class = def.type.toString();
- columnDefs.add(cDef);
- }
- }
- return columnDefs;
- }
-
- Iterator<CqlRow> iterator = rows.iterator();
- while (iterator.hasNext())
- {
- CqlRow row = iterator.next();
- ColumnDef cDef = new ColumnDef();
- String type = ByteBufferUtil.string(row.getColumns().get(3).value);
- if (!type.equals("regular"))
- continue;
- cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
- cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
- ByteBuffer indexType = row.getColumns().get(2).value;
- if (indexType != null)
- cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
- columnDefs.add(cDef);
- }
- return columnDefs;
- }
-
- /** get index type from string */
- protected IndexType getIndexType(String type)
- {
- type = type.toLowerCase();
- if ("keys".equals(type))
- return IndexType.KEYS;
- else if("custom".equals(type))
- return IndexType.CUSTOM;
- else if("composites".equals(type))
- return IndexType.COMPOSITES;
- else
- return null;
- }
-
- /** return partition keys */
- public String[] getPartitionKeys(String location, Job job) throws IOException
- {
- if (!usePartitionFilter)
- return null;
- List<ColumnDef> indexes = getIndexes();
- String[] partitionKeys = new String[indexes.size()];
- for (int i = 0; i < indexes.size(); i++)
- {
- partitionKeys[i] = new String(indexes.get(i).getName());
- }
- return partitionKeys;
- }
-
- /** get a list of columns with defined index*/
- protected List<ColumnDef> getIndexes() throws IOException
- {
- CfDef cfdef = getCfInfo(loadSignature).cfDef;
- List<ColumnDef> indexes = new ArrayList<ColumnDef>();
- for (ColumnDef cdef : cfdef.column_metadata)
- {
- if (cdef.index_type != null)
- indexes.add(cdef);
- }
- return indexes;
- }
-
- /** get CFMetaData of a column family */
- protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
- throws NotFoundException,
- InvalidRequestException,
- TException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException
- {
- KsDef ksDef = client.describe_keyspace(ks);
- for (CfDef cfDef : ksDef.cf_defs)
- {
- if (cfDef.name.equalsIgnoreCase(cf))
- return ThriftConversion.fromThrift(cfDef);
- }
- return null;
- }
-
- protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
- {
- if (validator instanceof DecimalType || validator instanceof InetAddressType)
- return validator.getString(value);
-
- if (validator instanceof CollectionType)
- {
- // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
- // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
- return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
- }
-
- return validator.compose(value);
- }
-
- protected static class CfInfo
- {
- boolean compactCqlTable = false;
- boolean cql3Table = false;
- CfDef cfDef;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 1ad80b7..5d354a7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -30,12 +30,12 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.schema.LegacySchemaTables;
@@ -83,7 +83,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private boolean slice_reverse = false;
private boolean allow_deletes = false;
- private RecordReader<ByteBuffer, Map<ByteBuffer, Cell>> reader;
+ private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader;
private RecordWriter<ByteBuffer, List<Mutation>> writer;
private boolean widerows = false;
@@ -113,7 +113,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
// wide row hacks
private ByteBuffer lastKey;
- private Map<ByteBuffer, Cell> lastRow;
+ private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow;
private boolean hasNext = true;
public CassandraStorage()
@@ -164,7 +164,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
key = reader.getCurrentKey();
tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
}
- for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -202,7 +202,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
- for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -216,17 +216,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
}
- SortedMap<ByteBuffer, Cell> row = (SortedMap<ByteBuffer, Cell>)reader.getCurrentValue();
+ SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row =
+ (SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
- for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
- for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -251,7 +252,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
- Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
+ Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@ -285,7 +286,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
added.put(cdef.name, true);
}
// now add all the other columns
- for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -1338,27 +1339,25 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
/** convert a column to a tuple */
- protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException
+ protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
- ByteBuffer colName = col.name().toByteBuffer();
-
// name
if(comparator instanceof AbstractCompositeType)
- StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName));
+ StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name));
else
- StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+ StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion));
// value
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- if (validators.get(colName) == null)
+ if (validators.get(column.name) == null)
{
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion));
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion));
}
else
- StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion));
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion));
return pair;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 537f30c..dc3c174 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -34,9 +34,6 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -159,9 +156,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
if (columnValue != null)
{
- Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue);
AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
- setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator);
+ setTupleValue(tuple, i, cqlColumnToObj(ByteBufferUtil.bytes(cdef.getName()), columnValue,
+ tableMetadata), validator);
}
else
tuple.set(i, null);
@@ -176,12 +173,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
}
/** convert a cql column to an object */
- private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
+ private Object cqlColumnToObj(ByteBuffer name, ByteBuffer columnValue, TableInfo cfDef) throws IOException
{
// standard
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- ByteBuffer cellName = col.name().toByteBuffer();
- return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion);
+ return StorageHelper.cassandraToObj(validators.get(name), columnValue, nativeProtocolVersion);
}
/** set the value to the position of the tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/test/conf/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml
index 8d99aa2..6d75aaf 100644
--- a/test/conf/logback-test.xml
+++ b/test/conf/logback-test.xml
@@ -61,7 +61,9 @@
<level>WARN</level>
</filter>
</appender>
-
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
<root level="DEBUG">
<appender-ref ref="ASYNCFILE" />
<appender-ref ref="STDERR" />
[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by sa...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9718e13d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9718e13d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9718e13d
Branch: refs/heads/trunk
Commit: 9718e13d7ec5d7cf88095d3db81091ca75247e15
Parents: 78c0305 1936657
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Thu Jun 4 10:55:19 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Jun 4 10:55:19 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/hadoop_cql3_word_count/README.txt | 14 +-
.../bin/word_count_counters | 1 +
.../conf/log4j.properties | 32 -
.../hadoop_cql3_word_count/conf/logback.xml | 42 +
.../hadoop_cql3_word_count/src/WordCount.java | 4 +-
examples/hadoop_word_count/README.txt | 10 +-
.../hadoop_word_count/bin/word_count_counters | 1 +
.../hadoop_word_count/conf/log4j.properties | 32 -
examples/hadoop_word_count/conf/logback.xml | 42 +
examples/hadoop_word_count/src/WordCount.java | 20 +-
.../src/WordCountCounters.java | 24 +-
.../hadoop/ColumnFamilyInputFormat.java | 7 +-
.../hadoop/ColumnFamilyRecordReader.java | 117 +--
.../hadoop/pig/AbstractCassandraStorage.java | 775 -------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 35 +-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 12 +-
test/conf/logback-test.xml | 4 +-
18 files changed, 218 insertions(+), 955 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9718e13d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 688e8ad,882279f..ea619e6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
+3.0:
+ * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
+ * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
+ * Change default garbage collector to G1 (CASSANDRA-7486)
+
+
2.2
+ * Remove use of Cell in Thrift MapReduce classes (CASSANDRA-8609)
* Integrate pre-release Java Driver 2.2-rc1, custom build (CASSANDRA-9493)
* Clean up gossiper logic for old versions (CASSANDRA-9370)
* Fix custom payload coding/decoding to match the spec (CASSANDRA-9515)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9718e13d/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0635459,5d354a7..7bf43ef
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -113,7 -113,8 +113,7 @@@ public class CassandraStorage extends L
// wide row hacks
private ByteBuffer lastKey;
- private Map<ByteBuffer, Cell> lastRow;
+ private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow;
- private boolean hasNext = true;
public CassandraStorage()
{
[2/3] cassandra git commit: Remove use of Cell in Thrift M/R classes
Posted by sa...@apache.org.
Remove use of Cell in Thrift M/R classes
Patch and review by Philip Thompson and Sam Tunnicliffe for
CASSANDRA-8609
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/19366575
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/19366575
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/19366575
Branch: refs/heads/trunk
Commit: 1936657570655129052cc48fa373c155086a6456
Parents: 16b0288
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Jun 2 15:56:36 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Jun 4 10:46:57 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/hadoop_cql3_word_count/README.txt | 14 +-
.../bin/word_count_counters | 1 +
.../conf/log4j.properties | 32 -
.../hadoop_cql3_word_count/conf/logback.xml | 42 +
.../hadoop_cql3_word_count/src/WordCount.java | 4 +-
examples/hadoop_word_count/README.txt | 10 +-
.../hadoop_word_count/bin/word_count_counters | 1 +
.../hadoop_word_count/conf/log4j.properties | 32 -
examples/hadoop_word_count/conf/logback.xml | 42 +
examples/hadoop_word_count/src/WordCount.java | 20 +-
.../src/WordCountCounters.java | 24 +-
.../hadoop/ColumnFamilyInputFormat.java | 7 +-
.../hadoop/ColumnFamilyRecordReader.java | 117 +--
.../hadoop/pig/AbstractCassandraStorage.java | 796 -------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 35 +-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 12 +-
test/conf/logback-test.xml | 4 +-
18 files changed, 218 insertions(+), 976 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 933f5a6..882279f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Remove use of Cell in Thrift MapReduce classes (CASSANDRA-8609)
* Integrate pre-release Java Driver 2.2-rc1, custom build (CASSANDRA-9493)
* Clean up gossiper logic for old versions (CASSANDRA-9370)
* Fix custom payload coding/decoding to match the spec (CASSANDRA-9515)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/README.txt b/examples/hadoop_cql3_word_count/README.txt
index b69bdd5..b6ee33f 100644
--- a/examples/hadoop_cql3_word_count/README.txt
+++ b/examples/hadoop_cql3_word_count/README.txt
@@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows,
and counts them, with RandomPartitioner. The word_count_counters example sums
the value of counter columns for a key.
-The scripts in bin/ assume you are running with cwd of contrib/word_count.
+The scripts in bin/ assume you are running with cwd of examples/word_count.
Running
=======
-First build and start a Cassandra server with the default configuration*,
-then run
+First build and start a Cassandra server with the default configuration*. Ensure that the Thrift
+interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running
+`nodetool enablethrift` after startup.
+Once Cassandra has started and the Thrift interface is available, run
contrib/word_count$ ant
contrib/word_count$ bin/word_count_setup
@@ -22,14 +24,14 @@ contrib/word_count$ bin/word_count_counters
In order to view the results in Cassandra, one can use bin/cqlsh and
perform the following operations:
$ bin/cqlsh localhost
-> use cql3_worldcount;
+> use cql3_wordcount;
> select * from output_words;
The output of the word count can now be configured. In the bin/word_count
file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
directories. The cassandra option outputs to the 'output_words' column family
-in the 'cql3_worldcount' keyspace. 'cassandra' is the default.
+in the 'cql3_wordcount' keyspace. 'cassandra' is the default.
Read the code in src/ for more details.
@@ -45,5 +47,5 @@ settings accordingly.
Troubleshooting
===============
-word_count uses conf/log4j.properties to log to wc.out.
+word_count uses conf/logback.xml to log to wc.out.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
index 0b69b40..52ea2e5 100755
--- a/examples/hadoop_cql3_word_count/bin/word_count_counters
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then
exit 1
fi
+CLASSPATH=$CLASSPATH:$cwd/../conf
CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/log4j.properties b/examples/hadoop_cql3_word_count/conf/log4j.properties
deleted file mode 100644
index 508d60f..0000000
--- a/examples/hadoop_cql3_word_count/conf/log4j.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=INFO,stdout,F
-
-#stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
-
-# log file
-log4j.appender.F=org.apache.log4j.FileAppender
-log4j.appender.F.Append=false
-log4j.appender.F.layout=org.apache.log4j.PatternLayout
-log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
-# Edit the next line to point to your logs directory
-log4j.appender.F.File=wc.out
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/logback.xml b/examples/hadoop_cql3_word_count/conf/logback.xml
new file mode 100644
index 0000000..443bd1c
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/conf/logback.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration scan="true">
+
+ <jmxConfigurator />
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>wc.out</file>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="FILE" />
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 3702a2b..bc95736 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -44,7 +44,7 @@ import com.datastax.driver.core.Row;
/**
* This counts the occurrences of words in ColumnFamily
- * cql3_worldcount ( id uuid,
+ * cql3_wordcount ( id uuid,
* line text,
* PRIMARY KEY (id))
*
@@ -60,7 +60,7 @@ public class WordCount extends Configured implements Tool
{
private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
static final String INPUT_MAPPER_VAR = "input_mapper";
- static final String KEYSPACE = "cql3_worldcount";
+ static final String KEYSPACE = "cql3_wordcount";
static final String COLUMN_FAMILY = "inputs";
static final String OUTPUT_REDUCER_VAR = "output_reducer";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/README.txt b/examples/hadoop_word_count/README.txt
index ec6f512..e336b89 100644
--- a/examples/hadoop_word_count/README.txt
+++ b/examples/hadoop_word_count/README.txt
@@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows,
and counts them, with RandomPartitioner. The word_count_counters example sums
the value of counter columns for a key.
-The scripts in bin/ assume you are running with cwd of contrib/word_count.
+The scripts in bin/ assume you are running with cwd of examples/word_count.
Running
=======
-First build and start a Cassandra server with the default configuration*,
-then run
+First build and start a Cassandra server with the default configuration*. Ensure that the Thrift
+interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running
+`nodetool enablethrift` after startup.
+Once Cassandra has started and the Thrift interface is available, run
contrib/word_count$ ant
contrib/word_count$ bin/word_count_setup
@@ -45,4 +47,4 @@ settings accordingly.
Troubleshooting
===============
-word_count uses conf/log4j.properties to log to wc.out.
+word_count uses conf/logback.xml to log to wc.out.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count_counters b/examples/hadoop_word_count/bin/word_count_counters
index 7793477..58c398c 100755
--- a/examples/hadoop_word_count/bin/word_count_counters
+++ b/examples/hadoop_word_count/bin/word_count_counters
@@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then
exit 1
fi
+CLASSPATH=$CLASSPATH:$cwd/../conf
CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/log4j.properties b/examples/hadoop_word_count/conf/log4j.properties
deleted file mode 100644
index 508d60f..0000000
--- a/examples/hadoop_word_count/conf/log4j.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=INFO,stdout,F
-
-#stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
-
-# log file
-log4j.appender.F=org.apache.log4j.FileAppender
-log4j.appender.F.Append=false
-log4j.appender.F.layout=org.apache.log4j.PatternLayout
-log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
-# Edit the next line to point to your logs directory
-log4j.appender.F.File=wc.out
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/logback.xml b/examples/hadoop_word_count/conf/logback.xml
new file mode 100644
index 0000000..443bd1c
--- /dev/null
+++ b/examples/hadoop_word_count/conf/logback.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration scan="true">
+
+ <jmxConfigurator />
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>wc.out</file>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="FILE" />
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java
index f6bca77..d092f1f 100644
--- a/examples/hadoop_word_count/src/WordCount.java
+++ b/examples/hadoop_word_count/src/WordCount.java
@@ -20,15 +20,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -71,7 +67,7 @@ public class WordCount extends Configured implements Tool
System.exit(0);
}
- public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, IntWritable>
+ public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@@ -82,17 +78,17 @@ public class WordCount extends Configured implements Tool
{
}
- public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException
+ public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException
{
- for (Cell cell : columns.values())
+ for (ColumnFamilyRecordReader.Column column : columns.values())
{
- String name = ByteBufferUtil.string(cell.name().toByteBuffer());
+ String name = ByteBufferUtil.string(column.name);
String value = null;
if (name.contains("int"))
- value = String.valueOf(ByteBufferUtil.toInt(cell.value()));
+ value = String.valueOf(ByteBufferUtil.toInt(column.value));
else
- value = ByteBufferUtil.string(cell.value());
+ value = ByteBufferUtil.string(column.value);
logger.debug("read {}:{}={} from {}",
new Object[] {ByteBufferUtil.string(key), name, value, context.getInputSplit()});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCountCounters.java b/examples/hadoop_word_count/src/WordCountCounters.java
index 39fb778..98c8579 100644
--- a/examples/hadoop_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_word_count/src/WordCountCounters.java
@@ -20,26 +20,26 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.SortedMap;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.thrift.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
/**
* This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1".
*
@@ -60,15 +60,15 @@ public class WordCountCounters extends Configured implements Tool
System.exit(0);
}
- public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, LongWritable>
+ public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, LongWritable>
{
- public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException
+ public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException
{
long sum = 0;
- for (Cell cell : columns.values())
+ for (ColumnFamilyRecordReader.Column column : columns.values())
{
- logger.debug("read " + key + ":" + cell.name() + " from " + context.getInputSplit());
- sum += ByteBufferUtil.toLong(cell.value());
+ logger.debug("read " + key + ":" + ByteBufferUtil.string(column.name) + " from " + context.getInputSplit());
+ sum += ByteBufferUtil.toLong(column.value);
}
context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index f89825f..4662fa5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.db.Cell;
import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
@@ -57,7 +56,7 @@ import org.apache.thrift.transport.TTransportException;
* The default split size is 64k rows.
*/
@Deprecated
-public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
@@ -91,12 +90,12 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
return client;
}
- public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+ public RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
}
- public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+ public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
{
TaskAttemptContext tac = HadoopCompat.newMapContext(
jobConf,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index c103d75..aee730d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,9 +29,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
@@ -49,8 +46,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
@Deprecated
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
- implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
+ implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
@@ -58,7 +55,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private ColumnFamilySplit split;
private RowIterator iter;
- private Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> currentRow;
+ private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
private SlicePredicate predicate;
private boolean isEmptyPredicate;
private int totalRowCount; // total number of rows to fetch
@@ -98,7 +95,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return currentRow.left;
}
- public SortedMap<ByteBuffer, Cell> getCurrentValue()
+ public SortedMap<ByteBuffer, Column> getCurrentValue()
{
return currentRow.right;
}
@@ -216,7 +213,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return split.getLocations()[0];
}
- private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
+ private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
{
protected List<KeySlice> rows;
protected int totalRead = 0;
@@ -283,50 +280,48 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return totalRead;
}
- protected List<Cell> unthriftify(ColumnOrSuperColumn cosc)
+ protected List<Pair<ByteBuffer, Column>> unthriftify(ColumnOrSuperColumn cosc)
{
if (cosc.counter_column != null)
- return Collections.<Cell>singletonList(unthriftifyCounter(cosc.counter_column));
+ return Collections.singletonList(unthriftifyCounter(cosc.counter_column));
if (cosc.counter_super_column != null)
return unthriftifySuperCounter(cosc.counter_super_column);
if (cosc.super_column != null)
return unthriftifySuper(cosc.super_column);
assert cosc.column != null;
- return Collections.<Cell>singletonList(unthriftifySimple(cosc.column));
+ return Collections.singletonList(unthriftifySimple(cosc.column));
}
- private List<Cell> unthriftifySuper(SuperColumn super_column)
+ private List<Pair<ByteBuffer, Column>> unthriftifySuper(SuperColumn super_column)
{
- List<Cell> cells = new ArrayList<Cell>(super_column.columns.size());
+ List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
for (org.apache.cassandra.thrift.Column column : super_column.columns)
{
- Cell c = unthriftifySimple(column);
- cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer()))));
+ Pair<ByteBuffer, Column> c = unthriftifySimple(column);
+ columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
}
- return cells;
+ return columns;
}
- protected Cell unthriftifySimple(org.apache.cassandra.thrift.Column column)
+ protected Pair<ByteBuffer, Column> unthriftifySimple(org.apache.cassandra.thrift.Column column)
{
- return new BufferCell(CellNames.simpleDense(column.name), column.value, column.timestamp);
+ return Pair.create(column.name, Column.fromRegularColumn(column));
}
- private Cell unthriftifyCounter(CounterColumn column)
+ private Pair<ByteBuffer, Column> unthriftifyCounter(CounterColumn column)
{
- //CounterColumns read the counterID from the System keyspace, so need the StorageService running and access
- //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Cell.
- return new BufferCell(CellNames.simpleDense(column.name), ByteBufferUtil.bytes(column.value), 0);
+ return Pair.create(column.name, Column.fromCounterColumn(column));
}
- private List<Cell> unthriftifySuperCounter(CounterSuperColumn super_column)
+ private List<Pair<ByteBuffer, Column>> unthriftifySuperCounter(CounterSuperColumn super_column)
{
- List<Cell> cells = new ArrayList<Cell>(super_column.columns.size());
+ List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
for (CounterColumn column : super_column.columns)
{
- Cell c = unthriftifyCounter(column);
- cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer()))));
+ Pair<ByteBuffer, Column> c = unthriftifyCounter(column);
+ columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
}
- return cells;
+ return columns;
}
}
@@ -405,7 +400,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
maybeInit();
if (rows == null)
@@ -414,12 +409,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
totalRead++;
KeySlice ks = rows.get(i++);
AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
- SortedMap<ByteBuffer, Cell> map = new TreeMap<ByteBuffer, Cell>(comp);
+ SortedMap<ByteBuffer, Column> map = new TreeMap<>(comp);
for (ColumnOrSuperColumn cosc : ks.columns)
{
- List<Cell> cells = unthriftify(cosc);
- for (Cell cell : cells)
- map.put(cell.name().toByteBuffer(), cell);
+ List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
+ for (Pair<ByteBuffer, Column> column : columns)
+ map.put(column.left, column.right);
}
return Pair.create(ks.key, map);
}
@@ -427,7 +422,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private class WideRowIterator extends RowIterator
{
- private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> wideColumns;
+ private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -476,13 +471,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
maybeInit();
if (rows == null)
return endOfData();
- Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next = wideColumns.next();
+ Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
lastColumn = next.right.keySet().iterator().next().duplicate();
maybeIncreaseRowCounter(next);
@@ -494,7 +489,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
* Increases the row counter only if we really moved to the next row.
* @param next just fetched row slice
*/
- private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next)
+ private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
{
ByteBuffer currentKey = next.left;
if (!currentKey.equals(lastCountedKey))
@@ -504,7 +499,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
+ private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
{
private final Iterator<KeySlice> rows;
private Iterator<ColumnOrSuperColumn> columns;
@@ -525,7 +520,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
columns = currentRow.columns.iterator();
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
while (true)
@@ -533,20 +528,20 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
if (columns.hasNext())
{
ColumnOrSuperColumn cosc = columns.next();
- SortedMap<ByteBuffer, Cell> map;
- List<Cell> cells = unthriftify(cosc);
- if (cells.size() == 1)
+ SortedMap<ByteBuffer, Column> map;
+ List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
+ if (columns.size() == 1)
{
- map = ImmutableSortedMap.of(cells.get(0).name().toByteBuffer(), cells.get(0));
+ map = ImmutableSortedMap.of(columns.get(0).left, columns.get(0).right);
}
else
{
assert isSuper;
- map = new TreeMap<ByteBuffer, Cell>(comp);
- for (Cell cell : cells)
- map.put(cell.name().toByteBuffer(), cell);
+ map = new TreeMap<>(comp);
+ for (Pair<ByteBuffer, Column> column : columns)
+ map.put(column.left, column.right);
}
- return Pair.<ByteBuffer, SortedMap<ByteBuffer, Cell>>create(currentRow.key, map);
+ return Pair.create(currentRow.key, map);
}
if (!rows.hasNext())
@@ -563,7 +558,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
// to the old. Thus, expect a small performance hit.
// And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
// and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Cell> value) throws IOException
+ public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
{
if (this.nextKeyValue())
{
@@ -584,13 +579,37 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return ByteBuffer.wrap(new byte[this.keyBufferSize]);
}
- public SortedMap<ByteBuffer, Cell> createValue()
+ public SortedMap<ByteBuffer, Column> createValue()
{
- return new TreeMap<ByteBuffer, Cell>();
+ return new TreeMap<>();
}
public long getPos() throws IOException
{
return iter.rowsRead();
}
+
+ public static final class Column
+ {
+ public final ByteBuffer name;
+ public final ByteBuffer value;
+ public final long timestamp;
+
+ private Column(ByteBuffer name, ByteBuffer value, long timestamp)
+ {
+ this.name = name;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ static Column fromRegularColumn(org.apache.cassandra.thrift.Column input)
+ {
+ return new Column(input.name, input.value, input.timestamp);
+ }
+
+ static Column fromCounterColumn(org.apache.cassandra.thrift.CounterColumn input)
+ {
+ return new Column(input.name, ByteBufferUtil.bytes(input.value), 0);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
deleted file mode 100644
index 263e6c0..0000000
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.pig;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import org.apache.cassandra.transport.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.pig.*;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Cassandra
- */
-public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
-{
-
- protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
-
- // system environment variables that can be set to configure connection info:
- // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
- public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
- public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
- public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
- public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
- public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
- public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
- public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
- public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
- public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
- public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
- public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
- public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
-
- protected String DEFAULT_INPUT_FORMAT;
- protected String DEFAULT_OUTPUT_FORMAT;
-
- public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class);
-
- protected String username;
- protected String password;
- protected String keyspace;
- protected String column_family;
- protected String loadSignature;
- protected String storeSignature;
-
- protected Configuration conf;
- protected String inputFormatClass;
- protected String outputFormatClass;
- protected int splitSize = 64 * 1024;
- protected String partitionerClass;
- protected boolean usePartitionFilter = false;
- protected String initHostAddress;
- protected String rpcPort;
- protected int nativeProtocolVersion = 1;
-
-
- public AbstractCassandraStorage()
- {
- super();
- }
-
- /** Deconstructs a composite type to a Tuple. */
- protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
- {
- List<CompositeComponent> result = comparator.deconstruct(name);
- Tuple t = TupleFactory.getInstance().newTuple(result.size());
- for (int i=0; i<result.size(); i++)
- setTupleValue(t, i, cassandraToObj(result.get(i).comparator, result.get(i).value));
-
- return t;
- }
-
- /** convert a column to a tuple */
- protected Tuple columnToTuple(Cell col, CfInfo cfInfo, AbstractType comparator) throws IOException
- {
- CfDef cfDef = cfInfo.cfDef;
- Tuple pair = TupleFactory.getInstance().newTuple(2);
-
- ByteBuffer colName = col.name().toByteBuffer();
-
- // name
- if(comparator instanceof AbstractCompositeType)
- setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName));
- else
- setTupleValue(pair, 0, cassandraToObj(comparator, colName));
-
- // value
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
- {
- ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(colName);
- colName = names[names.length-1];
- }
- if (validators.get(colName) == null)
- {
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
- }
- else
- setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
- return pair;
- }
-
- /** set the value to the position of the tuple */
- protected void setTupleValue(Tuple pair, int position, Object value) throws ExecException
- {
- if (value instanceof BigInteger)
- pair.set(position, ((BigInteger) value).intValue());
- else if (value instanceof ByteBuffer)
- pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
- else if (value instanceof UUID)
- pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
- else if (value instanceof Date)
- pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
- else
- pair.set(position, value);
- }
-
- /** get the columnfamily definition for the signature */
- protected CfInfo getCfInfo(String signature) throws IOException
- {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- String prop = property.getProperty(signature);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfdefFromString(prop.substring(2));
- cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
- cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
- return cfInfo;
- }
-
- /** construct a map to store the mashaller type to cassandra data type mapping */
- protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
- {
- Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
- AbstractType comparator;
- AbstractType subcomparator;
- AbstractType default_validator;
- AbstractType key_validator;
-
- comparator = parseType(cfDef.getComparator_type());
- subcomparator = parseType(cfDef.getSubcomparator_type());
- default_validator = parseType(cfDef.getDefault_validation_class());
- key_validator = parseType(cfDef.getKey_validation_class());
-
- marshallers.put(MarshallerType.COMPARATOR, comparator);
- marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
- marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
- marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
- return marshallers;
- }
-
- /** get the validators */
- protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
- {
- Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
- for (ColumnDef cd : cfDef.getColumn_metadata())
- {
- if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
- {
- AbstractType validator = null;
- try
- {
- validator = TypeParser.parse(cd.getValidation_class());
- if (validator instanceof CounterColumnType)
- validator = LongType.instance;
- validators.put(cd.name, validator);
- }
- catch (ConfigurationException | SyntaxException e)
- {
- throw new IOException(e);
- }
- }
- }
- return validators;
- }
-
- /** parse the string to a cassandra data type */
- protected AbstractType parseType(String type) throws IOException
- {
- try
- {
- // always treat counters like longs, specifically CCT.compose is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- @Override
- public InputFormat getInputFormat() throws IOException
- {
- try
- {
- return FBUtilities.construct(inputFormatClass, "inputformat");
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- }
-
- /** decompose the query to store the parameters in a map */
- public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
- {
- String[] params = query.split("&");
- Map<String, String> map = new HashMap<String, String>(params.length);
- for (String param : params)
- {
- String[] keyValue = param.split("=");
- map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8"));
- }
- return map;
- }
-
- /** set hadoop cassandra connection settings */
- protected void setConnectionInformation() throws IOException
- {
- if (System.getenv(PIG_RPC_PORT) != null)
- {
- ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
- ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
- }
-
- if (System.getenv(PIG_INPUT_RPC_PORT) != null)
- ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
- if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
- ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
-
- if (System.getenv(PIG_INITIAL_ADDRESS) != null)
- {
- ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- }
- if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
- ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
- if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
- ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
-
- if (System.getenv(PIG_PARTITIONER) != null)
- {
- ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- }
- if(System.getenv(PIG_INPUT_PARTITIONER) != null)
- ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
- if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
- ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
- if (System.getenv(PIG_INPUT_FORMAT) != null)
- inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
- else
- inputFormatClass = DEFAULT_INPUT_FORMAT;
- if (System.getenv(PIG_OUTPUT_FORMAT) != null)
- outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
- else
- outputFormatClass = DEFAULT_OUTPUT_FORMAT;
- }
-
- /** get the full class name */
- protected String getFullyQualifiedClassName(String classname)
- {
- return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
- }
-
- /** get pig type for the cassandra data type*/
- protected byte getPigType(AbstractType type)
- {
- if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
- return DataType.LONG;
- else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
- return DataType.INTEGER;
- else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
- return DataType.CHARARRAY;
- else if (type instanceof FloatType)
- return DataType.FLOAT;
- else if (type instanceof DoubleType)
- return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
- return DataType.TUPLE;
-
- return DataType.BYTEARRAY;
- }
-
- public ResourceStatistics getStatistics(String location, Job job)
- {
- return null;
- }
-
- @Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException
- {
- return location;
- }
-
- @Override
- public void setUDFContextSignature(String signature)
- {
- this.loadSignature = signature;
- }
-
- /** StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature)
- {
- this.storeSignature = signature;
- }
-
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
- {
- return relativeToAbsolutePath(location, curDir);
- }
-
- /** output format */
- public OutputFormat getOutputFormat() throws IOException
- {
- try
- {
- return FBUtilities.construct(outputFormatClass, "outputformat");
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- }
-
- public void checkSchema(ResourceSchema schema) throws IOException
- {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
- protected abstract ByteBuffer nullToBB();
-
- /** convert object to ByteBuffer */
- protected ByteBuffer objToBB(Object o)
- {
- if (o == null)
- return nullToBB();
- if (o instanceof java.lang.String)
- return ByteBuffer.wrap(new DataByteArray((String)o).get());
- if (o instanceof Integer)
- return Int32Type.instance.decompose((Integer)o);
- if (o instanceof Long)
- return LongType.instance.decompose((Long)o);
- if (o instanceof Float)
- return FloatType.instance.decompose((Float)o);
- if (o instanceof Double)
- return DoubleType.instance.decompose((Double)o);
- if (o instanceof UUID)
- return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
- if(o instanceof Tuple) {
- List<Object> objects = ((Tuple)o).getAll();
- //collections
- if (objects.size() > 0 && objects.get(0) instanceof String)
- {
- String collectionType = (String) objects.get(0);
- if ("set".equalsIgnoreCase(collectionType) ||
- "list".equalsIgnoreCase(collectionType))
- return objToListOrSetBB(objects.subList(1, objects.size()));
- else if ("map".equalsIgnoreCase(collectionType))
- return objToMapBB(objects.subList(1, objects.size()));
-
- }
- return objToCompositeBB(objects);
- }
-
- return ByteBuffer.wrap(((DataByteArray) o).get());
- }
-
- private ByteBuffer objToListOrSetBB(List<Object> objects)
- {
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- for(Object sub : objects)
- {
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- }
- // NOTE: using protocol v1 serialization format for collections so as to not break
- // compatibility. Not sure if that's the right thing.
- return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1);
- }
-
- private ByteBuffer objToMapBB(List<Object> objects)
- {
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
- for(Object sub : objects)
- {
- List<Object> keyValue = ((Tuple)sub).getAll();
- for (Object entry: keyValue)
- {
- ByteBuffer buffer = objToBB(entry);
- serialized.add(buffer);
- }
- }
- // NOTE: using protocol v1 serialization format for collections so as to not break
- // compatibility. Not sure if that's the right thing.
- return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1);
- }
-
- private ByteBuffer objToCompositeBB(List<Object> objects)
- {
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
- {
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
- }
- out.flip();
- return out;
- }
-
- public void cleanupOnFailure(String failure, Job job)
- {
- }
-
- public void cleanupOnSuccess(String location, Job job) throws IOException {
- }
-
-
- /** Methods to get the column family schema from Cassandra */
- protected void initSchema(String signature) throws IOException
- {
- Properties properties = UDFContext.getUDFContext().getUDFProperties(AbstractCassandraStorage.class);
-
- // Only get the schema if we haven't already gotten it
- if (!properties.containsKey(signature))
- {
- try
- {
- Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
- client.set_keyspace(keyspace);
-
- if (username != null && password != null)
- {
- Map<String, String> credentials = new HashMap<String, String>(2);
- credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
- credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
-
- try
- {
- client.login(new AuthenticationRequest(credentials));
- }
- catch (AuthenticationException e)
- {
- logger.error("Authentication exception: invalid username and/or password");
- throw new IOException(e);
- }
- }
-
- // compose the CfDef for the columfamily
- CfInfo cfInfo = getCfInfo(client);
-
- if (cfInfo.cfDef != null)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
- properties.setProperty(signature, sb.toString());
- }
- else
- throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
- column_family,
- keyspace));
- }
- catch (Exception e)
- {
- throw new IOException(e);
- }
- }
- }
-
- /** convert CfDef to string */
- protected static String cfdefToString(CfDef cfDef) throws IOException
- {
- assert cfDef != null;
- // this is so awful it's kind of cool!
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try
- {
- return Hex.bytesToHex(serializer.serialize(cfDef));
- }
- catch (TException e)
- {
- throw new IOException(e);
- }
- }
-
- /** convert string back to CfDef */
- protected static CfDef cfdefFromString(String st) throws IOException
- {
- assert st != null;
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- CfDef cfDef = new CfDef();
- try
- {
- deserializer.deserialize(cfDef, Hex.hexToBytes(st));
- }
- catch (TException e)
- {
- throw new IOException(e);
- }
- return cfDef;
- }
-
- /** return the CfInfo for the column family */
- protected CfInfo getCfInfo(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- NotFoundException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- IOException
- {
- // get CF meta data
- String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNFAMILIES,
- keyspace,
- column_family);
-
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- CfDef cfDef = new CfDef();
- cfDef.keyspace = keyspace;
- cfDef.name = column_family;
- boolean cql3Table = false;
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
-
- cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
- cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
- ByteBuffer subComparator = cqlRow.columns.get(2).value;
- if (subComparator != null)
- cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
- cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
- cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value);
- if (FBUtilities.fromJsonList(keyAliases).size() > 0)
- cql3Table = true;
- }
- cfDef.column_metadata = getColumnMetadata(client);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfDef;
- if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
- cfInfo.compactCqlTable = true;
- if (cql3Table)
- cfInfo.cql3Table = true;;
- return cfInfo;
- }
-
- /** get a list of columns */
- protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException;
-
- /** get column meta data */
- protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
- {
- String query = String.format("SELECT column_name, validator, index_type, type " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNS,
- keyspace,
- column_family);
-
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
- List<CqlRow> rows = result.rows;
- List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
- if (rows == null || rows.isEmpty())
- {
- // if CassandraStorage, just return the empty list
- if (cassandraStorage)
- return columnDefs;
-
- // otherwise for CqlNativeStorage, check metadata for classic thrift tables
- CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
- for (ColumnDefinition def : cfm.regularAndStaticColumns())
- {
- ColumnDef cDef = new ColumnDef();
- String columnName = def.name.toString();
- String type = def.type.toString();
- logger.debug("name: {}, type: {} ", columnName, type);
- cDef.name = ByteBufferUtil.bytes(columnName);
- cDef.validation_class = type;
- columnDefs.add(cDef);
- }
- // we may not need to include the value column for compact tables as we
- // could have already processed it as schema_columnfamilies.value_alias
- if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
- {
- ColumnDefinition def = cfm.compactValueColumn();
- if ("value".equals(def.name.toString()))
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = def.name.bytes;
- cDef.validation_class = def.type.toString();
- columnDefs.add(cDef);
- }
- }
- return columnDefs;
- }
-
- Iterator<CqlRow> iterator = rows.iterator();
- while (iterator.hasNext())
- {
- CqlRow row = iterator.next();
- ColumnDef cDef = new ColumnDef();
- String type = ByteBufferUtil.string(row.getColumns().get(3).value);
- if (!type.equals("regular"))
- continue;
- cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
- cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
- ByteBuffer indexType = row.getColumns().get(2).value;
- if (indexType != null)
- cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
- columnDefs.add(cDef);
- }
- return columnDefs;
- }
-
- /** get index type from string */
- protected IndexType getIndexType(String type)
- {
- type = type.toLowerCase();
- if ("keys".equals(type))
- return IndexType.KEYS;
- else if("custom".equals(type))
- return IndexType.CUSTOM;
- else if("composites".equals(type))
- return IndexType.COMPOSITES;
- else
- return null;
- }
-
- /** return partition keys */
- public String[] getPartitionKeys(String location, Job job) throws IOException
- {
- if (!usePartitionFilter)
- return null;
- List<ColumnDef> indexes = getIndexes();
- String[] partitionKeys = new String[indexes.size()];
- for (int i = 0; i < indexes.size(); i++)
- {
- partitionKeys[i] = new String(indexes.get(i).getName());
- }
- return partitionKeys;
- }
-
- /** get a list of columns with defined index*/
- protected List<ColumnDef> getIndexes() throws IOException
- {
- CfDef cfdef = getCfInfo(loadSignature).cfDef;
- List<ColumnDef> indexes = new ArrayList<ColumnDef>();
- for (ColumnDef cdef : cfdef.column_metadata)
- {
- if (cdef.index_type != null)
- indexes.add(cdef);
- }
- return indexes;
- }
-
- /** get CFMetaData of a column family */
- protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
- throws NotFoundException,
- InvalidRequestException,
- TException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException
- {
- KsDef ksDef = client.describe_keyspace(ks);
- for (CfDef cfDef : ksDef.cf_defs)
- {
- if (cfDef.name.equalsIgnoreCase(cf))
- return ThriftConversion.fromThrift(cfDef);
- }
- return null;
- }
-
- protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
- {
- if (validator instanceof DecimalType || validator instanceof InetAddressType)
- return validator.getString(value);
-
- if (validator instanceof CollectionType)
- {
- // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
- // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
- return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
- }
-
- return validator.compose(value);
- }
-
- protected static class CfInfo
- {
- boolean compactCqlTable = false;
- boolean cql3Table = false;
- CfDef cfDef;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 1ad80b7..5d354a7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -30,12 +30,12 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.schema.LegacySchemaTables;
@@ -83,7 +83,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private boolean slice_reverse = false;
private boolean allow_deletes = false;
- private RecordReader<ByteBuffer, Map<ByteBuffer, Cell>> reader;
+ private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader;
private RecordWriter<ByteBuffer, List<Mutation>> writer;
private boolean widerows = false;
@@ -113,7 +113,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
// wide row hacks
private ByteBuffer lastKey;
- private Map<ByteBuffer, Cell> lastRow;
+ private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow;
private boolean hasNext = true;
public CassandraStorage()
@@ -164,7 +164,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
key = reader.getCurrentKey();
tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
}
- for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -202,7 +202,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
- for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -216,17 +216,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
}
- SortedMap<ByteBuffer, Cell> row = (SortedMap<ByteBuffer, Cell>)reader.getCurrentValue();
+ SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row =
+ (SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
- for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
- for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -251,7 +252,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
- Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
+ Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@ -285,7 +286,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
added.put(cdef.name, true);
}
// now add all the other columns
- for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
+ for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -1338,27 +1339,25 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
/** convert a column to a tuple */
- protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException
+ protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
- ByteBuffer colName = col.name().toByteBuffer();
-
// name
if(comparator instanceof AbstractCompositeType)
- StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName));
+ StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name));
else
- StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+ StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion));
// value
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- if (validators.get(colName) == null)
+ if (validators.get(column.name) == null)
{
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion));
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion));
}
else
- StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion));
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion));
return pair;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 537f30c..dc3c174 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -34,9 +34,6 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -159,9 +156,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
if (columnValue != null)
{
- Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue);
AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
- setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator);
+ setTupleValue(tuple, i, cqlColumnToObj(ByteBufferUtil.bytes(cdef.getName()), columnValue,
+ tableMetadata), validator);
}
else
tuple.set(i, null);
@@ -176,12 +173,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
}
/** convert a cql column to an object */
- private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
+ private Object cqlColumnToObj(ByteBuffer name, ByteBuffer columnValue, TableInfo cfDef) throws IOException
{
// standard
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- ByteBuffer cellName = col.name().toByteBuffer();
- return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion);
+ return StorageHelper.cassandraToObj(validators.get(name), columnValue, nativeProtocolVersion);
}
/** set the value to the position of the tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/test/conf/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml
index 8d99aa2..6d75aaf 100644
--- a/test/conf/logback-test.xml
+++ b/test/conf/logback-test.xml
@@ -61,7 +61,9 @@
<level>WARN</level>
</filter>
</appender>
-
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
<root level="DEBUG">
<appender-ref ref="ASYNCFILE" />
<appender-ref ref="STDERR" />