You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/08/11 03:11:40 UTC
[07/18] git commit: Version and licenses for 2.0.10 release
Version and licenses for 2.0.10 release
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2af8c9da
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2af8c9da
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2af8c9da
Branch: refs/heads/trunk
Commit: 2af8c9da5669f860f1339d789a0f3a0c4f65e5c2
Parents: 7895273
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Aug 8 10:54:41 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 8 17:48:55 2014 +0200
----------------------------------------------------------------------
NEWS.txt | 5 +-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 308 +++++++++++++++++++
2 files changed, 310 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2af8c9da/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 79212f8..9b521e4 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -72,13 +72,12 @@ Upgrading
2.0.10
-====
+======
New features
------------
- CqlPaginRecordReader and CqlPagingInputFormat have both been removed.
Use CqlInputFormat instead.
- - If you are using Leveled Compaction, you can now disable doing
- size-tiered
+ - If you are using Leveled Compaction, you can now disable doing size-tiered
compaction in L0 by starting Cassandra with -Dcassandra.disable_stcs_in_l0
(see CASSANDRA-6621 for details).
- Shuffle and taketoken have been removed. For clusters that choose to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2af8c9da/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
new file mode 100644
index 0000000..6cce4a9
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.cassandra.db.BufferCell;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.datastax.driver.core.Row;
+
+public class CqlNativeStorage extends CqlStorage
+{
+ private RecordReader<Long, Row> reader;
+ private String nativePort;
+ private String nativeCoreConnections;
+ private String nativeMaxConnections;
+ private String nativeMinSimultReqs;
+ private String nativeMaxSimultReqs;
+ private String nativeConnectionTimeout;
+ private String nativeReadConnectionTimeout;
+ private String nativeReceiveBufferSize;
+ private String nativeSendBufferSize;
+ private String nativeSolinger;
+ private String nativeTcpNodelay;
+ private String nativeReuseAddress;
+ private String nativeKeepAlive;
+ private String nativeAuthProvider;
+ private String nativeSSLTruststorePath;
+ private String nativeSSLKeystorePath;
+ private String nativeSSLTruststorePassword;
+ private String nativeSSLKeystorePassword;
+ private String nativeSSLCipherSuites;
+ private String inputCql;
+
+ public CqlNativeStorage()
+ {
+ this(1000);
+ }
+
+ /** @param pageSize limit number of CQL rows to fetch in a thrift request */
+ public CqlNativeStorage(int pageSize)
+ {
+ super(pageSize);
+ DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
+ }
+
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ {
+ this.reader = reader;
+ }
+
+ /** get next row */
+ public Tuple getNext() throws IOException
+ {
+ try
+ {
+ // load the next pair
+ if (!reader.nextKeyValue())
+ return null;
+
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
+ Row row = reader.getCurrentValue();
+ Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
+ Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+ int i = 0;
+ while (itera.hasNext())
+ {
+ ColumnDef cdef = itera.next();
+ ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+ if (columnValue != null)
+ {
+ Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
+ AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
+ setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
+ }
+ else
+ tuple.set(i, null);
+ i++;
+ }
+ return tuple;
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /** set read configuration settings */
+ public void setLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
+
+ if (username != null && password != null)
+ {
+ ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+ CqlConfigHelper.setUserNameAndPassword(conf, username, password);
+ }
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setInputPartitioner(conf, partitionerClass);
+ if (initHostAddress != null)
+ ConfigHelper.setInputInitialAddress(conf, initHostAddress);
+ if (rpcPort != null)
+ ConfigHelper.setInputRpcPort(conf, rpcPort);
+ if (nativePort != null)
+ CqlConfigHelper.setInputNativePort(conf, nativePort);
+ if (nativeCoreConnections != null)
+ CqlConfigHelper.setInputCoreConnections(conf, nativeCoreConnections);
+ if (nativeMaxConnections != null)
+ CqlConfigHelper.setInputMaxConnections(conf, nativeMaxConnections);
+ if (nativeMinSimultReqs != null)
+ CqlConfigHelper.setInputMinSimultReqPerConnections(conf, nativeMinSimultReqs);
+ if (nativeMaxSimultReqs != null)
+ CqlConfigHelper.setInputMaxSimultReqPerConnections(conf, nativeMaxSimultReqs);
+ if (nativeConnectionTimeout != null)
+ CqlConfigHelper.setInputNativeConnectionTimeout(conf, nativeConnectionTimeout);
+ if (nativeReadConnectionTimeout != null)
+ CqlConfigHelper.setInputNativeReadConnectionTimeout(conf, nativeReadConnectionTimeout);
+ if (nativeReceiveBufferSize != null)
+ CqlConfigHelper.setInputNativeReceiveBufferSize(conf, nativeReceiveBufferSize);
+ if (nativeSendBufferSize != null)
+ CqlConfigHelper.setInputNativeSendBufferSize(conf, nativeSendBufferSize);
+ if (nativeSolinger != null)
+ CqlConfigHelper.setInputNativeSolinger(conf, nativeSolinger);
+ if (nativeTcpNodelay != null)
+ CqlConfigHelper.setInputNativeTcpNodelay(conf, nativeTcpNodelay);
+ if (nativeReuseAddress != null)
+ CqlConfigHelper.setInputNativeReuseAddress(conf, nativeReuseAddress);
+ if (nativeKeepAlive != null)
+ CqlConfigHelper.setInputNativeKeepAlive(conf, nativeKeepAlive);
+ if (nativeAuthProvider != null)
+ CqlConfigHelper.setInputNativeAuthProvider(conf, nativeAuthProvider);
+ if (nativeSSLTruststorePath != null)
+ CqlConfigHelper.setInputNativeSSLTruststorePath(conf, nativeSSLTruststorePath);
+ if (nativeSSLKeystorePath != null)
+ CqlConfigHelper.setInputNativeSSLKeystorePath(conf, nativeSSLKeystorePath);
+ if (nativeSSLTruststorePassword != null)
+ CqlConfigHelper.setInputNativeSSLTruststorePassword(conf, nativeSSLTruststorePassword);
+ if (nativeSSLKeystorePassword != null)
+ CqlConfigHelper.setInputNativeSSLKeystorePassword(conf, nativeSSLKeystorePassword);
+ if (nativeSSLCipherSuites != null)
+ CqlConfigHelper.setInputNativeSSLCipherSuites(conf, nativeSSLCipherSuites);
+
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+ setConnectionInformation();
+
+ CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ {
+ try
+ {
+ ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
+ }
+ }
+
+ if (ConfigHelper.getInputInitialAddress(conf) == null)
+ throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getInputPartitioner(conf) == null)
+ throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+ if (loadSignature == null)
+ loadSignature = location;
+
+ initSchema(loadSignature);
+ }
+
+ private void setLocationFromUri(String location) throws IOException
+ {
+ try
+ {
+ if (!location.startsWith("cql://"))
+ throw new Exception("Bad scheme: " + location);
+
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+
+ // each page row size
+ if (urlQuery.containsKey("page_size"))
+ pageSize = Integer.parseInt(urlQuery.get("page_size"));
+
+ // output prepared statement
+ if (urlQuery.containsKey("output_query"))
+ outputQuery = urlQuery.get("output_query");
+
+ //split size
+ if (urlQuery.containsKey("split_size"))
+ splitSize = Integer.parseInt(urlQuery.get("split_size"));
+ if (urlQuery.containsKey("partitioner"))
+ partitionerClass = urlQuery.get("partitioner");
+ if (urlQuery.containsKey("use_secondary"))
+ usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
+ if (urlQuery.containsKey("init_address"))
+ initHostAddress = urlQuery.get("init_address");
+
+ if (urlQuery.containsKey("native_port"))
+ nativePort = urlQuery.get("native_port");
+ if (urlQuery.containsKey("core_conns"))
+ nativeCoreConnections = urlQuery.get("core_conns");
+ if (urlQuery.containsKey("max_conns"))
+ nativeMaxConnections = urlQuery.get("max_conns");
+ if (urlQuery.containsKey("min_simult_reqs"))
+ nativeMinSimultReqs = urlQuery.get("min_simult_reqs");
+ if (urlQuery.containsKey("max_simult_reqs"))
+ nativeMaxSimultReqs = urlQuery.get("max_simult_reqs");
+ if (urlQuery.containsKey("native_timeout"))
+ nativeConnectionTimeout = urlQuery.get("native_timeout");
+ if (urlQuery.containsKey("native_read_timeout"))
+ nativeReadConnectionTimeout = urlQuery.get("native_read_timeout");
+ if (urlQuery.containsKey("rec_buff_size"))
+ nativeReceiveBufferSize = urlQuery.get("rec_buff_size");
+ if (urlQuery.containsKey("send_buff_size"))
+ nativeSendBufferSize = urlQuery.get("send_buff_size");
+ if (urlQuery.containsKey("solinger"))
+ nativeSolinger = urlQuery.get("solinger");
+ if (urlQuery.containsKey("tcp_nodelay"))
+ nativeTcpNodelay = urlQuery.get("tcp_nodelay");
+ if (urlQuery.containsKey("reuse_address"))
+ nativeReuseAddress = urlQuery.get("reuse_address");
+ if (urlQuery.containsKey("keep_alive"))
+ nativeKeepAlive = urlQuery.get("keep_alive");
+ if (urlQuery.containsKey("auth_provider"))
+ nativeAuthProvider = urlQuery.get("auth_provider");
+ if (urlQuery.containsKey("trust_store_path"))
+ nativeSSLTruststorePath = urlQuery.get("trust_store_path");
+ if (urlQuery.containsKey("key_store_path"))
+ nativeSSLKeystorePath = urlQuery.get("key_store_path");
+ if (urlQuery.containsKey("trust_store_password"))
+ nativeSSLTruststorePassword = urlQuery.get("trust_store_password");
+ if (urlQuery.containsKey("key_store_password"))
+ nativeSSLKeystorePassword = urlQuery.get("key_store_password");
+ if (urlQuery.containsKey("cipher_suites"))
+ nativeSSLCipherSuites = urlQuery.get("cipher_suites");
+ if (urlQuery.containsKey("input_cql"))
+ inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("rpc_port"))
+ rpcPort = urlQuery.get("rpc_port");
+ }
+ String[] parts = urlParts[0].split("/+");
+ String[] credentialsAndKeyspace = parts[1].split("@");
+ if (credentialsAndKeyspace.length > 1)
+ {
+ String[] credentials = credentialsAndKeyspace[0].split(":");
+ username = credentials[0];
+ password = credentials[1];
+ keyspace = credentialsAndKeyspace[1];
+ }
+ else
+ {
+ keyspace = parts[1];
+ }
+ column_family = parts[2];
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
+ "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
+ "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" +
+ "[&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]" +
+ "[&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]" +
+ "[&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]" +
+ "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
+ "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
+ "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ }
+ }
+
+}