You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/07 02:46:51 UTC

git commit: Example for cassandra CQL read/write from spark

Repository: spark
Updated Branches:
  refs/heads/master 328c73d03 -> 9ae919c02


Example for cassandra CQL read/write from spark

Cassandra read/write using CqlPagingInputFormat/CqlOutputFormat

Author: anitatailor <ta...@gmail.com>

Closes #87 from anitatailor/master and squashes the following commits:

3493f81 [anitatailor] Fixed scala style as per review
19480b7 [anitatailor] Example for cassandra CQL read/write from spark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ae919c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ae919c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ae919c0

Branch: refs/heads/master
Commit: 9ae919c02f7b7d069215e8dc6cafef0ec79c9d5f
Parents: 328c73d
Author: anitatailor <ta...@gmail.com>
Authored: Thu Mar 6 17:46:43 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Mar 6 17:46:43 2014 -0800

----------------------------------------------------------------------
 .../spark/examples/CassandraCQLTest.scala       | 137 +++++++++++++++++++
 1 file changed, 137 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ae919c0/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
new file mode 100644
index 0000000..ee283ce
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+import scala.collection.immutable.Map
+import org.apache.cassandra.hadoop.ConfigHelper
+import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
+import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
+import org.apache.cassandra.utils.ByteBufferUtil
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+
+/*
+  Need to create following keyspace and column family in cassandra before running this example
+  Start CQL shell using ./bin/cqlsh and execute following commands
+  CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+  use retail;
+  CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id));
+  CREATE TABLE ordercf (user_id text,
+    time timestamp,
+    prod_id text,
+    quantity int,
+    PRIMARY KEY (user_id, time));
+  INSERT INTO ordercf (user_id,
+    time,
+    prod_id,
+    quantity) VALUES  ('bob', 1385983646000, 'iphone', 1);
+  INSERT INTO ordercf (user_id,
+    time,
+    prod_id,
+    quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
+  INSERT INTO ordercf (user_id,
+    time,
+    prod_id,
+    quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
+  INSERT INTO ordercf (user_id,
+    time,
+    prod_id,
+    quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
+*/
+ 
+/**
+ * This example demonstrates how to read and write to cassandra column family created using CQL3
+ * using Spark.
+ * Parameters : <spark_master> <cassandra_node> <cassandra_port>
+ * Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160
+ *
+ */
+object CassandraCQLTest {
+
+  def main(args: Array[String]) {
+    val sc = new SparkContext(args(0),
+               "CQLTestApp",
+               System.getenv("SPARK_HOME"),
+               SparkContext.jarOfClass(this.getClass))
+    val cHost: String = args(1)
+    val cPort: String = args(2)
+    val KeySpace = "retail"
+    val InputColumnFamily = "ordercf"
+    val OutputColumnFamily = "salecount"
+
+    val job = new Job()
+    job.setInputFormatClass(classOf[CqlPagingInputFormat])
+    ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
+    ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
+    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
+    ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+    CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
+
+    /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */
+
+    /** An UPDATE writes one or more columns to a record in a Cassandra column family */
+    val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
+    CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
+
+    job.setOutputFormatClass(classOf[CqlOutputFormat])
+    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
+    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
+    ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
+    ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+
+    val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
+      classOf[CqlPagingInputFormat],
+      classOf[java.util.Map[String,ByteBuffer]],
+      classOf[java.util.Map[String,ByteBuffer]])
+
+    println("Count: " + casRdd.count)
+    val productSaleRDD = casRdd.map {
+      case (key, value) => {
+        (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
+      }
+    }
+    val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
+    aggregatedRDD.collect().foreach {
+      case (productId, saleCount) => println(productId + ":" + saleCount)
+    }
+
+    val casoutputCF  = aggregatedRDD.map {
+      case (productId, saleCount) => {
+        val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
+        val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
+        var outColFamVal = new ListBuffer[ByteBuffer]
+        outColFamVal += ByteBufferUtil.bytes(saleCount)
+        val outVal: java.util.List[ByteBuffer] = outColFamVal
+       (outKey, outVal)
+      }
+    }
+
+    casoutputCF.saveAsNewAPIHadoopFile(
+        KeySpace,
+        classOf[java.util.Map[String, ByteBuffer]],
+        classOf[java.util.List[ByteBuffer]],
+        classOf[CqlOutputFormat],
+        job.getConfiguration()
+      )
+  }
+}