You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/06 21:19:16 UTC
[04/10] Add CQL3 input/output formats patch by Alex Liu;
reviewed by jbellis and Mike Schrag for CASSANDRA-4421
http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
new file mode 100644
index 0000000..3939e0b
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.Progressable;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
+ * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * in the value to the prepared statement, which it associates with the key, and in
+ * turn the responsible endpoint.
+ *
+ * <p>
+ * Furthermore, this writer groups the cql queries by the endpoint responsible for
+ * the rows being affected. This allows the cql queries to be executed in parallel,
+ * directly to a responsible endpoint.
+ * </p>
+ *
+ * @see ColumnFamilyOutputFormat
+ */
+final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordWriter.class);
+
+ // handles for clients for each range running in the threadpool
+ private final Map<Range, RangeClient> clients;
+
+ // host to prepared statement id mappings
+ private ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
+
+ private final String cql;
+
+ private AbstractType<?> keyValidator;
+ private String [] partitionkeys;
+
+ /**
+ * Upon construction, obtain the map that this writer will use to collect
+ * mutations, and the ring cache for the given keyspace.
+ *
+ * @param context the task attempt context
+ * @throws IOException
+ */
+ ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
+ {
+ this(context.getConfiguration());
+ this.progressable = new Progressable(context);
+ }
+
+ ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) throws IOException
+ {
+ this(conf);
+ this.progressable = progressable;
+ }
+
+ ColumnFamilyRecordWriter(Configuration conf) throws IOException
+ {
+ super(conf);
+ this.clients = new HashMap<Range, RangeClient>();
+ cql = CQLConfigHelper.getOutputCql(conf);
+
+ try
+ {
+ String host = getAnyHost();
+ int port = ConfigHelper.getOutputRpcPort(conf);
+ Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
+ retrievePartitionKeyValidator(client);
+
+ if (client != null)
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ client = null;
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ // close all the clients before throwing anything
+ IOException clientException = null;
+ for (RangeClient client : clients.values())
+ {
+ try
+ {
+ client.close();
+ }
+ catch (IOException e)
+ {
+ clientException = e;
+ }
+ }
+
+ if (clientException != null)
+ throw clientException;
+ }
+
+ /**
+ * If the key is to be associated with a valid value, a mutation is created
+ * for it with the given column family and columns. In the event the value
+ * in the column is missing (i.e., null), then it is marked for
+ * {@link Deletion}. Similarly, if the entire value for a key is missing
+ * (i.e., null), then the entire key is marked for {@link Deletion}.
+ * </p>
+ *
+ * @param keybuff
+ * the key to write.
+ * @param values
+ * the values to write.
+ * @throws IOException
+ */
+ @Override
+ public void write(Map<String, ByteBuffer> keys, List<ByteBuffer> values) throws IOException
+ {
+ ByteBuffer rowKey = getRowKey(keys);
+ Range<Token> range = ringCache.getRange(rowKey);
+
+ // get the client for the given range, or create a new one
+ RangeClient client = clients.get(range);
+ if (client == null)
+ {
+ // haven't seen keys for this range: create new client
+ client = new RangeClient(ringCache.getEndpoint(range));
+ client.start();
+ clients.put(range, client);
+ }
+
+ client.put(Pair.create(rowKey, values));
+ progressable.progress();
+ }
+
+ /**
+ * A client that runs in a threadpool and connects to the list of endpoints for a particular
+ * range. Binded variable values for keys in that range are sent to this client via a queue.
+ */
+ public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
+ {
+ /**
+ * Constructs an {@link RangeClient} for the given endpoints.
+ * @param endpoints the possible endpoints to execute the mutations on
+ */
+ public RangeClient(List<InetAddress> endpoints)
+ {
+ super(endpoints);
+ }
+
+ /**
+ * Loops collecting cql binded variable values from the queue and sending to Cassandra
+ */
+ public void run()
+ {
+ outer:
+ while (run || !queue.isEmpty())
+ {
+ Pair<ByteBuffer, List<ByteBuffer>> bindVariables;
+ try
+ {
+ bindVariables = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ // re-check loop condition after interrupt
+ continue;
+ }
+
+ Iterator<InetAddress> iter = endpoints.iterator();
+ while (true)
+ {
+ // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
+ try
+ {
+ int i = 0;
+ int itemId = preparedStatement(client);
+ while (bindVariables != null)
+ {
+ client.execute_prepared_cql3_query(itemId, bindVariables.right, ConsistencyLevel.ONE);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+
+ bindVariables = queue.poll();
+ }
+
+ break;
+ }
+ catch (Exception e)
+ {
+ closeInternal();
+ if (!iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
+
+ // attempt to connect to a different endpoint
+ try
+ {
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ int port = ConfigHelper.getOutputRpcPort(conf);
+ client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
+ }
+ catch (Exception e)
+ {
+ closeInternal();
+ // TException means something unexpected went wrong to that endpoint, so
+ // we should try again to another. Other exceptions (auth or invalid request) are fatal.
+ if ((!(e instanceof TException)) || !iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
+ }
+ }
+ }
+
+ /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
+ private int preparedStatement(Cassandra.Client client)
+ {
+ Integer itemId = preparedStatements.get(client);
+ if (itemId == null)
+ {
+ CqlPreparedResult result;
+ try
+ {
+ result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException("failed to prepare cql query " + cql, e);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException("failed to prepare cql query " + cql, e);
+ }
+
+ Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
+ itemId = previousId == null ? result.itemId : previousId;
+ }
+ return itemId;
+ }
+ }
+
+ private ByteBuffer getRowKey(Map<String, ByteBuffer> keysMap)
+ {
+ //current row key
+ ByteBuffer rowKey;
+ if (keyValidator instanceof CompositeType)
+ {
+ ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
+ for (int i = 0; i< keys.length; i++)
+ keys[i] = keysMap.get(partitionkeys[i]);
+
+ rowKey = ((CompositeType) keyValidator).build(keys);
+ }
+ else
+ {
+ rowKey = keysMap.get(partitionkeys[0]);
+ }
+ return rowKey;
+ }
+
+ /** retrieve the key validator from system.schema_columnfamilies table */
+ private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
+ {
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ String cfName = ConfigHelper.getOutputColumnFamily(conf);
+ String query = "SELECT key_validator," +
+ " key_aliases " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name='%s' and columnfamily_name='%s'";
+ String formatted = String.format(query, keyspace, cfName);
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
+
+ Column rawKeyValidator = result.rows.get(0).columns.get(0);
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
+ keyValidator = parseType(validator);
+
+ Column rawPartitionKeys = result.rows.get(0).columns.get(1);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
+ logger.debug("partition keys: " + keyString);
+
+ List<String> keys = FBUtilities.fromJsonList(keyString);
+ partitionkeys = new String [keys.size()];
+ int i=0;
+ for (String key: keys)
+ {
+ partitionkeys[i] = key;
+ i++;
+ }
+ }
+
+ private AbstractType<?> parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ private String getAnyHost() throws IOException, InvalidRequestException, TException
+ {
+ Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
+ List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
+ try
+ {
+ for (TokenRange range : ring)
+ return range.endpoints.get(0);
+ }
+ finally
+ {
+ if (client != null)
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ client = null;
+ }
+ }
+ throw new IOException("There are no endpoints");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/thrift/TClientTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TClientTransportFactory.java b/src/java/org/apache/cassandra/thrift/TClientTransportFactory.java
new file mode 100644
index 0000000..0a73043
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TClientTransportFactory.java
@@ -0,0 +1,70 @@
+package org.apache.cassandra.thrift;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.transport.TTransport;
+
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Transport factory for establishing thrift connections from clients to a remote server.
+ */
+public interface TClientTransportFactory
+{
+ static final String PROPERTY_KEY = "cassandra.client.transport.factory";
+ static final String LONG_OPTION = "transport-factory";
+ static final String SHORT_OPTION = "tr";
+
+ /**
+ * Opens a client transport to a thrift server.
+ * Example:
+ *
+ * <pre>
+ * TTransport transport = clientTransportFactory.openTransport(address, port);
+ * Cassandra.Iface client = new Cassandra.Client(new BinaryProtocol(transport));
+ * </pre>
+ *
+ * @param host fully qualified hostname of the server
+ * @param port RPC port of the server
+ * @param conf Hadoop configuration
+ * @return open and ready to use transport
+ * @throws Exception implementation defined; usually throws TTransportException or IOException
+ * if the connection cannot be established
+ */
+ TTransport openTransport(String host, int port, Configuration conf) throws Exception;
+
+ /**
+ * Sets an implementation defined set of options.
+ * Keys in this map must conform to the set set returned by TClientTransportFactory#supportedOptions.
+ * @param options option map
+ */
+ void setOptions(Map<String, String> options);
+
+ /**
+ * @return set of options supported by this transport factory implementation
+ */
+ Set<String> supportedOptions();
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index 792618d..5905f4a 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -21,6 +21,10 @@ package org.apache.cassandra.thrift;
*
*/
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
@@ -29,7 +33,7 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.hadoop.conf.Configuration;
-public class TFramedTransportFactory implements ITransportFactory
+public class TFramedTransportFactory implements ITransportFactory, TClientTransportFactory
{
public TTransport openTransport(TSocket socket, Configuration conf) throws TTransportException
{
@@ -37,4 +41,22 @@ public class TFramedTransportFactory implements ITransportFactory
transport.open();
return transport;
}
+
+ //
+ public TTransport openTransport(String host, int port, Configuration conf) throws TTransportException
+ {
+ TSocket socket = new TSocket(host, port);
+ TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf));
+ transport.open();
+ return transport;
+ }
+
+ public void setOptions(Map<String, String> options)
+ {
+ }
+
+ public Set<String> supportedOptions()
+ {
+ return Collections.emptySet();
+ }
}