You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/05 22:56:58 UTC
[1/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
Repository: cassandra
Updated Branches:
refs/heads/trunk 5b6154531 -> f698cc228
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 22921e2..51e5e3d 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,33 +18,27 @@
package org.apache.cassandra.tools;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.*;
-import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.cli.*;
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.SSLOptions;
+import javax.net.ssl.SSLContext;
import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
import org.apache.cassandra.utils.OutputHandler;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
public class BulkLoader
{
@@ -54,7 +48,7 @@ public class BulkLoader
private static final String NOPROGRESS_OPTION = "no-progress";
private static final String IGNORE_NODES_OPTION = "ignore";
private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
- private static final String RPC_PORT_OPTION = "port";
+ private static final String NATIVE_PORT_OPTION = "port";
private static final String USER_OPTION = "username";
private static final String PASSWD_OPTION = "password";
private static final String THROTTLE_MBITS = "throttle";
@@ -82,13 +76,13 @@ public class BulkLoader
options.directory,
new ExternalClient(
options.hosts,
- options.rpcPort,
+ options.nativePort,
options.user,
options.passwd,
- options.transportFactory,
options.storagePort,
options.sslStoragePort,
- options.serverEncOptions),
+ options.serverEncOptions,
+ buildSSLOptions((EncryptionOptions.ClientEncryptionOptions)options.encOptions)),
handler,
options.connectionsPerHost);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
@@ -154,8 +148,13 @@ public class BulkLoader
start = lastTime = System.nanoTime();
}
- public void onSuccess(StreamState finalState) {}
- public void onFailure(Throwable t) {}
+ public void onSuccess(StreamState finalState)
+ {
+ }
+
+ public void onFailure(Throwable t)
+ {
+ }
public synchronized void handleStreamEvent(StreamEvent event)
{
@@ -254,14 +253,27 @@ public class BulkLoader
}
}
- static class ExternalClient extends SSTableLoader.Client
+ private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions)
+ {
+
+ if (!clientEncryptionOptions.enabled)
+ return null;
+
+ SSLContext sslContext;
+ try
+ {
+ sslContext = SSLFactory.createSSLContext(clientEncryptionOptions, true);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Could not create SSL Context.", e);
+ }
+
+ return new SSLOptions(sslContext, clientEncryptionOptions.cipher_suites);
+ }
+
+ static class ExternalClient extends NativeSSTableLoaderClient
{
- private final Map<String, CFMetaData> knownCfs = new HashMap<>();
- private final Set<InetAddress> hosts;
- private final int rpcPort;
- private final String user;
- private final String passwd;
- private final ITransportFactory transportFactory;
private final int storagePort;
private final int sslStoragePort;
private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
@@ -270,103 +282,22 @@ public class BulkLoader
int port,
String user,
String passwd,
- ITransportFactory transportFactory,
int storagePort,
int sslStoragePort,
- EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions)
+ EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions,
+ SSLOptions sslOptions)
{
- super();
- this.hosts = hosts;
- this.rpcPort = port;
- this.user = user;
- this.passwd = passwd;
- this.transportFactory = transportFactory;
+ super(hosts, port, user, passwd, sslOptions);
this.storagePort = storagePort;
this.sslStoragePort = sslStoragePort;
this.serverEncOptions = serverEncryptionOptions;
}
@Override
- public void init(String keyspace)
- {
- Iterator<InetAddress> hostiter = hosts.iterator();
- while (hostiter.hasNext())
- {
- try
- {
- // Query endpoint to ranges map and schemas from thrift
- InetAddress host = hostiter.next();
- Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd, this.transportFactory);
-
- setPartitioner(client.describe_partitioner());
- Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
- for (TokenRange tr : client.describe_ring(keyspace))
- {
- Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
- for (String ep : tr.endpoints)
- {
- addRangeForEndpoint(range, InetAddress.getByName(ep));
- }
- }
-
- String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNFAMILIES,
- keyspace);
- CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE);
-
-
- for (CqlRow row : cfRes.rows)
- {
- String columnFamily = UTF8Type.instance.getString(row.columns.get(1).bufferForName());
- String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNS,
- keyspace,
- columnFamily);
- CqlResult columnsRes = client.execute_cql3_query(ByteBufferUtil.bytes(columnsQuery), Compression.NONE, ConsistencyLevel.ONE);
-
- CFMetaData metadata = ThriftConversion.fromThriftCqlRow(row, columnsRes);
- knownCfs.put(metadata.cfName, metadata);
- }
- break;
- }
- catch (Exception e)
- {
- if (!hostiter.hasNext())
- throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
- }
- }
- }
-
- @Override
public StreamConnectionFactory getConnectionFactory()
{
return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
}
-
- @Override
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- return knownCfs.get(cfName);
- }
-
- private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd, ITransportFactory transportFactory) throws Exception
- {
- TTransport trans = transportFactory.openTransport(host, port);
- TProtocol protocol = new TBinaryProtocol(trans);
- Cassandra.Client client = new Cassandra.Client(protocol);
- if (user != null && passwd != null)
- {
- Map<String, String> credentials = new HashMap<>();
- credentials.put(PasswordAuthenticator.USERNAME_KEY, user);
- credentials.put(PasswordAuthenticator.PASSWORD_KEY, passwd);
- AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
- client.login(authenticationRequest);
- }
- return client;
- }
}
static class LoaderOptions
@@ -376,13 +307,12 @@ public class BulkLoader
public boolean debug;
public boolean verbose;
public boolean noProgress;
- public int rpcPort = 9160;
+ public int nativePort = 9042;
public String user;
public String passwd;
public int throttle = 0;
public int storagePort;
public int sslStoragePort;
- public ITransportFactory transportFactory = new TFramedTransportFactory();
public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
public int connectionsPerHost = 1;
public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
@@ -438,8 +368,8 @@ public class BulkLoader
opts.verbose = cmd.hasOption(VERBOSE_OPTION);
opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
- if (cmd.hasOption(RPC_PORT_OPTION))
- opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION));
+ if (cmd.hasOption(NATIVE_PORT_OPTION))
+ opts.nativePort = Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
if (cmd.hasOption(USER_OPTION))
opts.user = cmd.getOptionValue(USER_OPTION);
@@ -558,13 +488,6 @@ public class BulkLoader
opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
}
- if (cmd.hasOption(TRANSPORT_FACTORY))
- {
- ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
- configureTransportFactory(transportFactory, opts);
- opts.transportFactory = transportFactory;
- }
-
return opts;
}
catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -574,50 +497,6 @@ public class BulkLoader
}
}
- private static ITransportFactory getTransportFactory(String transportFactory)
- {
- try
- {
- Class<?> factory = Class.forName(transportFactory);
- if (!ITransportFactory.class.isAssignableFrom(factory))
- throw new IllegalArgumentException(String.format("transport factory '%s' " +
- "not derived from ITransportFactory", transportFactory));
- return (ITransportFactory) factory.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
- }
- }
-
- private static void configureTransportFactory(ITransportFactory transportFactory, LoaderOptions opts)
- {
- Map<String, String> options = new HashMap<>();
- // If the supplied factory supports the same set of options as our SSL impl, set those
- if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
- options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore);
- if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
- options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password);
- if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
- options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol);
- if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
- options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites));
-
- if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
- && opts.encOptions.require_client_auth)
- options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore);
- if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
- && opts.encOptions.require_client_auth)
- options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password);
-
- // Now check if any of the factory's supported options are set as system properties
- for (String optionKey : transportFactory.supportedOptions())
- if (System.getProperty(optionKey) != null)
- options.put(optionKey, System.getProperty(optionKey));
-
- transportFactory.setOptions(options);
- }
-
private static void errorMsg(String msg, CmdLineOptions options)
{
System.err.println(msg);
@@ -633,7 +512,7 @@ public class BulkLoader
options.addOption(null, NOPROGRESS_OPTION, "don't display progress");
options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information");
- options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
+ options.addOption("p", NATIVE_PORT_OPTION, "rpc port", "port used for native connection (default 9042)");
options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
new file mode 100644
index 0000000..1ef686c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.schema.LegacySchemaTables;
+
+public class NativeSSTableLoaderClient extends SSTableLoader.Client
+{
+ protected final Map<String, CFMetaData> tables;
+ private final Collection<InetAddress> hosts;
+ private final int port;
+ private final String username;
+ private final String password;
+ private final SSLOptions sslOptions;
+
+ public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String username, String password, SSLOptions sslOptions)
+ {
+ super();
+ this.tables = new HashMap<>();
+ this.hosts = hosts;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.sslOptions = sslOptions;
+ }
+
+ public void init(String keyspace)
+ {
+ Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port);
+ if (sslOptions != null)
+ builder.withSSL(sslOptions);
+ if (username != null && password != null)
+ builder = builder.withCredentials(username, password);
+
+ try (Cluster cluster = builder.build())
+ {
+ Session session = cluster.connect();
+ Metadata metadata = cluster.getMetadata();
+
+ setPartitioner(metadata.getPartitioner());
+
+ Set<TokenRange> tokenRanges = metadata.getTokenRanges();
+
+ Token.TokenFactory tokenFactory = getPartitioner().getTokenFactory();
+
+ for (TokenRange tokenRange : tokenRanges)
+ {
+ Set<Host> endpoints = metadata.getReplicas(keyspace, tokenRange);
+ Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()),
+ tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
+ for (Host endpoint : endpoints)
+ addRangeForEndpoint(range, endpoint.getAddress());
+ }
+
+ tables.putAll(fetchTablesMetadata(keyspace, session));
+ }
+ }
+
+ public CFMetaData getTableMetadata(String tableName)
+ {
+ return tables.get(tableName);
+ }
+
+ @Override
+ public void setTableMetadata(CFMetaData cfm)
+ {
+ tables.put(cfm.cfName, cfm);
+ }
+
+ private static Map<String, CFMetaData> fetchTablesMetadata(String keyspace, Session session)
+ {
+ Map<String, CFMetaData> tables = new HashMap<>();
+
+ String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator FROM %s.%s WHERE keyspace_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace);
+
+ for (Row row : session.execute(query))
+ {
+ String name = row.getString("columnfamily_name");
+ UUID id = row.getUUID("cf_id");
+ ColumnFamilyType type = ColumnFamilyType.valueOf(row.getString("type"));
+ AbstractType rawComparator = TypeParser.parse(row.getString("comparator"));
+ AbstractType subComparator = row.isNull("subcomparator")
+ ? null
+ : TypeParser.parse(row.getString("subcomparator"));
+ boolean isDense = row.getBool("is_dense");
+ CellNameType comparator = CellNames.fromAbstractType(CFMetaData.makeRawAbstractType(rawComparator, subComparator),
+ isDense);
+
+ tables.put(name, new CFMetaData(keyspace, name, type, comparator, id));
+ }
+
+ return tables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 26f9f68..72fdd5a 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.thrift.TException;
import org.junit.Assert;
@@ -70,6 +69,11 @@ public class CqlTableTest extends PigTestBase
"UPDATE collectiontable SET n['key2'] = 'value2' WHERE m = 'book2';",
"UPDATE collectiontable SET n['key3'] = 'value3' WHERE m = 'book3';",
"UPDATE collectiontable SET n['key4'] = 'value4' WHERE m = 'book4';",
+ "CREATE TABLE nulltable(m text PRIMARY KEY, n map<text, text>);",
+ "UPDATE nulltable SET n['key1'] = 'value1' WHERE m = 'book1';",
+ "UPDATE nulltable SET n['key2'] = 'value2' WHERE m = 'book2';",
+ "UPDATE nulltable SET n['key3'] = 'value3' WHERE m = 'book3';",
+ "UPDATE nulltable SET n['key4'] = 'value4' WHERE m = 'book4';",
};
@BeforeClass
@@ -229,65 +233,32 @@ public class CqlTableTest extends PigTestBase
}
@Test
- public void testCassandraStorageSchema() throws IOException
+ public void testCqlNativeStorageNullTuples() throws IOException
{
- //results: (key1,{((111,),),((111,column1),100),((111,column2),10.1)})
- pig.registerQuery("rows = LOAD 'cassandra://cql3ks/cqltable?" + defaultParameters + "' USING CassandraStorage();");
-
- //schema: {key: chararray,columns: {(name: (),value: bytearray)}}
- Iterator<Tuple> it = pig.openIterator("rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- String rowKey = t.get(0).toString();
- Assert.assertEquals(rowKey, "key1");
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- int i = 0;
- while (iter.hasNext())
- {
- i++;
- Tuple column = iter.next();
- if (i==1)
- {
- Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
- Assert.assertEquals(((Tuple) column.get(0)).get(1), "");
- Assert.assertEquals(column.get(1).toString(), "");
- }
- if (i==2)
- {
- Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
- Assert.assertEquals(((Tuple) column.get(0)).get(1), "column1");
- Assert.assertEquals(column.get(1), 100);
- }
- if (i==3)
- {
- Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
- Assert.assertEquals(((Tuple) column.get(0)).get(1), "column2");
- Assert.assertEquals(column.get(1), 10.1f);
- }
- }
- Assert.assertEquals(3, columns.size());
- }
- else
- {
- Assert.fail("Can't fetch any data");
- }
+ //input_cql=select * from collectiontable where token(m) > ? and token(m) <= ?
+ NullTupleTest("nulltable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ }
- //results: (key1,(column1,100),(column2,10.1))
- pig.registerQuery("compact_rows = LOAD 'cassandra://cql3ks/compactcqltable?" + defaultParameters + "' USING CassandraStorage();");
+ private void NullTupleTest(String initialQuery) throws IOException
+ {
+ pig.setBatchOn();
+ pig.registerQuery(initialQuery);
+ pig.registerQuery("recs= FOREACH nulltable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', null), TOTUPLE('n', null)));");
+ pig.registerQuery("STORE recs INTO 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters + "&output_query=update+cql3ks.nulltable+set+n+%3D+%3F' USING CqlNativeStorage();");
+ pig.executeBatch();
- //schema: {key: chararray,column1: (name: chararray,value: int),column2: (name: chararray,value: float)}
- it = pig.openIterator("compact_rows");
+ pig.registerQuery("result= LOAD 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ Iterator<Tuple> it = pig.openIterator("result");
if (it.hasNext()) {
Tuple t = it.next();
- String rowKey = t.get(0).toString();
- Assert.assertEquals(rowKey, "key1");
- Tuple column = (Tuple) t.get(1);
- Assert.assertEquals(column.get(0), "column1");
- Assert.assertEquals(column.get(1), 100);
- column = (Tuple) t.get(2);
- Assert.assertEquals(column.get(0), "column2");
- Assert.assertEquals(column.get(1), 10.1f);
+ Tuple t1 = (Tuple) t.get(1);
+ Assert.assertEquals(t1.size(), 2);
+ Tuple element1 = (Tuple) t1.get(0);
+ Tuple element2 = (Tuple) t1.get(1);
+ Assert.assertEquals(element1.get(0), "m");
+ Assert.assertEquals(element1.get(1), "");
+ Assert.assertEquals(element2.get(0), "n");
+ Assert.assertEquals(element2.get(1), "");
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 850f46d..6525527 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -92,16 +92,19 @@ public class CQLSSTableWriterTest
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
+ private String keyspace;
+
public void init(String keyspace)
{
+ this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
setPartitioner(StorageService.getPartitioner());
}
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ public CFMetaData getTableMetadata(String tableName)
{
- return Schema.instance.getCFMetaData(keyspace, cfName);
+ return Schema.instance.getCFMetaData(keyspace, tableName);
}
}, new OutputHandler.SystemOutput(false, false));
@@ -251,16 +254,19 @@ public class CQLSSTableWriterTest
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
+ private String keyspace;
+
public void init(String keyspace)
{
+ this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
setPartitioner(StorageService.getPartitioner());
}
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ public CFMetaData getTableMetadata(String tableName)
{
- return Schema.instance.getCFMetaData(keyspace, cfName);
+ return Schema.instance.getCFMetaData(keyspace, tableName);
}
}, new OutputHandler.SystemOutput(false, false));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index b245994..4a51fbd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -83,16 +83,19 @@ public class SSTableLoaderTest
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
+ private String keyspace;
+
public void init(String keyspace)
{
+ this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
setPartitioner(StorageService.getPartitioner());
}
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ public CFMetaData getTableMetadata(String tableName)
{
- return Schema.instance.getCFMetaData(keyspace, cfName);
+ return Schema.instance.getCFMetaData(keyspace, tableName);
}
}, new OutputHandler.SystemOutput(false, false));
[4/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
Posted by al...@apache.org.
Remove Thrift dependencies in bundled tools
patch by Philip Thompson; reviewed by Aleksey Yeschenko for
CASSANDRA-8358
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f698cc22
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f698cc22
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f698cc22
Branch: refs/heads/trunk
Commit: f698cc228452e847e3ad46bd8178549cf8171767
Parents: 5b61545
Author: Philip Thompson <pt...@gmail.com>
Authored: Tue May 5 21:38:23 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue May 5 23:57:39 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +
build.xml | 8 +-
lib/cassandra-driver-core-2.1.2.jar | Bin 638544 -> 0 bytes
lib/cassandra-driver-core-2.1.5-shaded.jar | Bin 0 -> 1994984 bytes
.../apache/cassandra/db/marshal/TypeParser.java | 71 +-
.../hadoop/AbstractBulkOutputFormat.java | 73 --
.../hadoop/AbstractBulkRecordWriter.java | 239 -------
.../hadoop/AbstractColumnFamilyInputFormat.java | 245 +++----
.../AbstractColumnFamilyOutputFormat.java | 164 -----
.../AbstractColumnFamilyRecordWriter.java | 193 -----
.../cassandra/hadoop/BulkOutputFormat.java | 51 +-
.../cassandra/hadoop/BulkRecordWriter.java | 145 +++-
.../hadoop/ColumnFamilyInputFormat.java | 47 +-
.../hadoop/ColumnFamilyOutputFormat.java | 119 +++-
.../hadoop/ColumnFamilyRecordReader.java | 1 +
.../hadoop/ColumnFamilyRecordWriter.java | 124 +++-
.../hadoop/cql3/CqlBulkOutputFormat.java | 81 ++-
.../hadoop/cql3/CqlBulkRecordWriter.java | 223 ++++--
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 52 +-
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 76 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 93 ++-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 392 ++++++----
.../cassandra/hadoop/pig/CassandraStorage.java | 706 +++++++++++++++++--
.../cassandra/hadoop/pig/CqlNativeStorage.java | 629 +++++++++++------
.../cassandra/hadoop/pig/StorageHelper.java | 121 ++++
.../cassandra/io/sstable/SSTableLoader.java | 13 +-
.../cassandra/service/StorageService.java | 10 +-
.../org/apache/cassandra/tools/BulkLoader.java | 209 ++----
.../utils/NativeSSTableLoaderClient.java | 126 ++++
.../org/apache/cassandra/pig/CqlTableTest.java | 81 +--
.../io/sstable/CQLSSTableWriterTest.java | 14 +-
.../cassandra/io/sstable/SSTableLoaderTest.java | 7 +-
33 files changed, 2678 insertions(+), 1639 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f89ece..ab92aa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)
* Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242)
* Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049)
* Distinguish between null and unset in protocol v4 (CASSANDRA-7304)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 03008de..32351a1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -75,6 +75,9 @@ New features
Upgrading
---------
- Pig's CqlStorage has been removed, use CqlNativeStorage instead
+ - Pig's CassandraStorage has been deprecated. CassandraStorage
+ should only be used against tables created via thrift.
+ Use CqlNativeStorage for all other tables.
- IAuthenticator been updated to remove responsibility for user/role
maintenance and is now solely responsible for validating credentials,
This is primarily done via SASL, though an optional method exists for
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index ba99cd9..a5f195f 100644
--- a/build.xml
+++ b/build.xml
@@ -381,7 +381,7 @@
<dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" />
<dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" />
+ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" />
<dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" />
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.4" />
<dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
@@ -433,7 +433,7 @@
<dependency groupId="org.apache.pig" artifactId="pig"/>
<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
<dependency groupId="org.antlr" artifactId="antlr"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
+ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
<dependency groupId="org.javassist" artifactId="javassist"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
<dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
@@ -501,12 +501,12 @@
<dependency groupId="org.apache.thrift" artifactId="libthrift"/>
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/>
-
+
<!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
<dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
+ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true"/>
<!-- don't need jna to run, but nice to have -->
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.2.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.2.jar b/lib/cassandra-driver-core-2.1.2.jar
deleted file mode 100644
index 2095c05..0000000
Binary files a/lib/cassandra-driver-core-2.1.2.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.5-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.5-shaded.jar b/lib/cassandra-driver-core-2.1.5-shaded.jar
new file mode 100644
index 0000000..bb83fb5
Binary files /dev/null and b/lib/cassandra-driver-core-2.1.5-shaded.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index ad7ffed..faa678e 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -21,13 +21,9 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +38,7 @@ public class TypeParser
private int idx;
// A cache of parsed string, specially useful for DynamicCompositeType
- private static final Map<String, AbstractType<?>> cache = new HashMap<String, AbstractType<?>>();
+ private static final Map<String, AbstractType<?>> cache = new HashMap<>();
public static final TypeParser EMPTY_PARSER = new TypeParser("", 0);
@@ -98,9 +94,48 @@ public class TypeParser
return parse(compareWith == null ? null : compareWith.toString());
}
- public static String getShortName(AbstractType<?> type)
+ public static String parseCqlNativeType(String str)
{
- return type.getClass().getSimpleName();
+ return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString();
+ }
+
+ public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException
+ {
+ str = str.trim().toLowerCase();
+ switch (str)
+ {
+ case "map": return "MapType";
+ case "set": return "SetType";
+ case "list": return "ListType";
+ case "frozen": return "FrozenType";
+ default: throw new SyntaxException("Invalid type name" + str);
+ }
+ }
+
+ /**
+ * Turns user facing type names into Abstract Types, 'text' -> UTF8Type
+ */
+ public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException
+ {
+ return parse(parseCqlNameRecurse(str));
+ }
+
+ private static String parseCqlNameRecurse(String str) throws SyntaxException
+ {
+ if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<'))))
+ {
+ String[] parseString = str.split(",", 2);
+ return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]);
+ }
+ else if (str.contains("<"))
+ {
+ String[] parseString = str.trim().split("<", 2);
+ return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")";
+ }
+ else
+ {
+ return parseCqlNativeType(str);
+ }
}
/**
@@ -126,7 +161,7 @@ public class TypeParser
if (str.charAt(idx) != '(')
throw new IllegalStateException();
- Map<String, String> map = new HashMap<String, String>();
+ Map<String, String> map = new HashMap<>();
++idx; // skipping '('
while (skipBlankAndComma())
@@ -157,7 +192,7 @@ public class TypeParser
public List<AbstractType<?>> getTypeParameters() throws SyntaxException, ConfigurationException
{
- List<AbstractType<?>> list = new ArrayList<AbstractType<?>>();
+ List<AbstractType<?>> list = new ArrayList<>();
if (isEOS())
return list;
@@ -191,7 +226,7 @@ public class TypeParser
public Map<Byte, AbstractType<?>> getAliasParameters() throws SyntaxException, ConfigurationException
{
- Map<Byte, AbstractType<?>> map = new HashMap<Byte, AbstractType<?>>();
+ Map<Byte, AbstractType<?>> map = new HashMap<>();
if (isEOS())
return map;
@@ -384,11 +419,7 @@ public class TypeParser
Field field = typeClass.getDeclaredField("instance");
return (AbstractType<?>) field.get(null);
}
- catch (NoSuchFieldException e)
- {
- throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
- }
- catch (IllegalAccessException e)
+ catch (NoSuchFieldException | IllegalAccessException e)
{
throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
}
@@ -489,12 +520,6 @@ public class TypeParser
return str.substring(i, idx);
}
- public char readNextChar()
- {
- skipBlank();
- return str.charAt(idx++);
- }
-
/**
* Helper function to ease the writing of AbstractType.toString() methods.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
deleted file mode 100644
index c0e91da..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-
-public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
- implements org.apache.hadoop.mapred.OutputFormat<K, V>
-{
- @Override
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- private void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
- }
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
- public static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
deleted file mode 100644
index 5ba0a96..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
-
-public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
-implements org.apache.hadoop.mapred.RecordWriter<K, V>
-{
- public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
- public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
- public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
- public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
-
- private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
-
- protected final Configuration conf;
- protected final int maxFailures;
- protected final int bufferSize;
- protected Closeable writer;
- protected SSTableLoader loader;
- protected Progressable progress;
- protected TaskAttemptContext context;
-
- protected AbstractBulkRecordWriter(TaskAttemptContext context)
- {
- this(HadoopCompat.getConfiguration(context));
- this.context = context;
- }
-
- protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
- {
- this(conf);
- this.progress = progress;
- }
-
- protected AbstractBulkRecordWriter(Configuration conf)
- {
- Config.setOutboundBindAny(true);
- this.conf = conf;
- DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
- maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
- bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
- }
-
- protected String getOutputLocation() throws IOException
- {
- String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
- if (dir == null)
- throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
- return dir;
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- private void close() throws IOException
- {
- if (writer != null)
- {
- writer.close();
- Future<StreamState> future = loader.stream();
- while (true)
- {
- try
- {
- future.get(1000, TimeUnit.MILLISECONDS);
- break;
- }
- catch (ExecutionException | TimeoutException te)
- {
- if (null != progress)
- progress.progress();
- if (null != context)
- HadoopCompat.progress(context);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
- if (loader.getFailedHosts().size() > 0)
- {
- if (loader.getFailedHosts().size() > maxFailures)
- throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
- else
- logger.warn("Some hosts failed: {}", loader.getFailedHosts());
- }
- }
- }
-
- public static class ExternalClient extends SSTableLoader.Client
- {
- private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
- private final Configuration conf;
- private final String hostlist;
- private final int rpcPort;
- private final String username;
- private final String password;
-
- public ExternalClient(Configuration conf)
- {
- super();
- this.conf = conf;
- this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
- this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
- this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
- this.password = ConfigHelper.getOutputKeyspacePassword(conf);
- }
-
- public void init(String keyspace)
- {
- String[] nodes = hostlist.split(",");
- Set<InetAddress> hosts = new HashSet<InetAddress>(nodes.length);
- for (String node : nodes)
- {
- try
- {
- hosts.add(InetAddress.getByName(node));
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
- Iterator<InetAddress> hostiter = hosts.iterator();
- while (hostiter.hasNext())
- {
- try
- {
- InetAddress host = hostiter.next();
- Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
-
- // log in
- client.set_keyspace(keyspace);
- if (username != null)
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(PasswordAuthenticator.USERNAME_KEY, username);
- creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- List<TokenRange> tokenRanges = client.describe_ring(keyspace);
- List<KsDef> ksDefs = client.describe_keyspaces();
-
- setPartitioner(client.describe_partitioner());
- Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
- for (TokenRange tr : tokenRanges)
- {
- Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
- for (String ep : tr.endpoints)
- {
- addRangeForEndpoint(range, InetAddress.getByName(ep));
- }
- }
-
- for (KsDef ksDef : ksDefs)
- {
- Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
- for (CfDef cfDef : ksDef.cf_defs)
- cfs.put(cfDef.name, ThriftConversion.fromThrift(cfDef));
- knownCfs.put(ksDef.name, cfs);
- }
- break;
- }
- catch (Exception e)
- {
- if (!hostiter.hasNext())
- throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
- }
- }
- }
-
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
- }
- }
-
- public static class NullOutputHandler implements OutputHandler
- {
- public void output(String msg) {}
- public void debug(String msg) {}
- public void warn(String msg) {}
- public void warn(String msg, Throwable th) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 691bd76..2ef4cf4 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -18,31 +18,27 @@
package org.apache.cassandra.hadoop;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.hadoop.cql3.*;
+import org.apache.cassandra.thrift.KeyRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
{
@@ -51,7 +47,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
public static final String MAPRED_TASK_ID = "mapred.task.id";
// The simple fact that we need this is because the old Hadoop API wants us to "write"
// to the key and value whereas the new asks for it.
- // I choose 8kb as the default max key size (instanciated only once), but you can
+ // I choose 8kb as the default max key size (instantiated only once), but you can
// override it in your jobConf with this setting.
public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
@@ -59,6 +55,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private String keyspace;
private String cfName;
private IPartitioner partitioner;
+ private Session session;
protected void validateConfiguration(Configuration conf)
{
@@ -72,57 +69,27 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
}
- public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
- {
- logger.debug("Creating authenticated client for CF input format");
- TTransport transport;
- try
- {
- transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
- }
- catch (Exception e)
- {
- throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
- }
- TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-
- // log in
- client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
- if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
- creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
- logger.debug("Authenticated client for CF input format created successfully");
- return client;
- }
-
public List<InputSplit> getSplits(JobContext context) throws IOException
{
- Configuration conf = HadoopCompat.getConfiguration(context);;
+ Configuration conf = HadoopCompat.getConfiguration(context);
validateConfiguration(conf);
- // cannonical ranges and nodes holding replicas
- List<TokenRange> masterRangeNodes = getRangeMap(conf);
-
keyspace = ConfigHelper.getInputKeyspace(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
partitioner = ConfigHelper.getInputPartitioner(conf);
logger.debug("partitioner is {}", partitioner);
+ // canonical ranges and nodes holding replicas
+ Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
- // cannonical ranges, split into pieces, fetching the splits in parallel
+ // canonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<InputSplit> splits = new ArrayList<>();
try
{
- List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+ List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
Range<Token> jobRange = null;
if (jobKeyRange != null)
@@ -130,7 +97,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
if (jobKeyRange.start_key != null)
{
if (!partitioner.preservesOrder())
- throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
+ throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
if (jobKeyRange.start_token != null)
throw new IllegalArgumentException("only start_key supported");
if (jobKeyRange.end_token != null)
@@ -149,26 +116,25 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- for (TokenRange range : masterRangeNodes)
+ session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+ Metadata metadata = session.getCluster().getMetadata();
+
+ for (TokenRange range : masterRangeNodes.keySet())
{
if (jobRange == null)
{
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+ // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+ splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
}
else
{
- Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
- partitioner.getTokenFactory().fromString(range.end_token));
-
- if (dhtRange.intersects(jobRange))
+ TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
+ if (range.intersects(jobTokenRange))
{
- for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
+ for (TokenRange intersection: range.intersectWith(jobTokenRange))
{
- range.start_token = partitioner.getTokenFactory().toString(intersection.left);
- range.end_token = partitioner.getTokenFactory().toString(intersection.right);
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+ // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+ splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf)));
}
}
}
@@ -197,53 +163,53 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
return splits;
}
+ private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+ {
+ return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+ metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+ }
+
/**
- * Gets a token range and splits it up according to the suggested
+ * Gets a token tokenRange and splits it up according to the suggested
* size into input splits that Hadoop can use.
*/
class SplitCallable implements Callable<List<InputSplit>>
{
- private final TokenRange range;
+ private final TokenRange tokenRange;
+ private final Set<Host> hosts;
private final Configuration conf;
- public SplitCallable(TokenRange tr, Configuration conf)
+ public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
{
- this.range = tr;
+ this.tokenRange = tr;
+ this.hosts = hosts;
this.conf = conf;
}
public List<InputSplit> call() throws Exception
{
- ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
- List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
- assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
+ ArrayList<InputSplit> splits = new ArrayList<>();
+ Map<TokenRange, Long> subSplits;
+ subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
// turn the sub-ranges into InputSplits
- String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+ String[] endpoints = new String[hosts.size()];
+
// hadoop needs hostname, not ip
int endpointIndex = 0;
- for (String endpoint: range.rpc_endpoints)
- {
- String endpoint_address = endpoint;
- if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
- endpoint_address = range.endpoints.get(endpointIndex);
- endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
- }
+ for (Host endpoint : hosts)
+ endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
- Token.TokenFactory factory = partitioner.getTokenFactory();
- for (CfSplit subSplit : subSplits)
+ for (TokenRange subSplit : subSplits.keySet())
{
- Token left = factory.fromString(subSplit.getStart_token());
- Token right = factory.fromString(subSplit.getEnd_token());
- Range<Token> range = new Range<Token>(left, right);
- List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
- for (Range<Token> subrange : ranges)
+ List<TokenRange> ranges = subSplit.unwrap();
+ for (TokenRange subrange : ranges)
{
ColumnFamilySplit split =
new ColumnFamilySplit(
- factory.toString(subrange.left),
- factory.toString(subrange.right),
- subSplit.getRow_count(),
+ subrange.getStart().toString().substring(2),
+ subrange.getEnd().toString().substring(2),
+ subSplits.get(subSplit),
endpoints);
logger.debug("adding {}", split);
@@ -254,80 +220,63 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+ private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
- int splitsize = ConfigHelper.getInputSplitSize(conf);
- for (int i = 0; i < range.rpc_endpoints.size(); i++)
+ int splitSize = ConfigHelper.getInputSplitSize(conf);
+ try
{
- String host = range.rpc_endpoints.get(i);
-
- if (host == null || host.equals("0.0.0.0"))
- host = range.endpoints.get(i);
-
- try
- {
- Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
- client.set_keyspace(keyspace);
-
- try
- {
- return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
- }
- catch (TApplicationException e)
- {
- // fallback to guessing split size if talking to a server without describe_splits_ex method
- if (e.getType() == TApplicationException.UNKNOWN_METHOD)
- {
- List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
- return tokenListToSplits(splitPoints, splitsize);
- }
- throw e;
- }
- }
- catch (IOException e)
- {
- logger.debug("failed connect to endpoint {}", host, e);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
+ return describeSplits(keyspace, cfName, range, splitSize);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
}
- throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
}
- private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
+ private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
{
- List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
- for (int j = 0; j < splitTokens.size() - 1; j++)
- splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
- return splits;
+ Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+
+ Map<TokenRange, Set<Host>> map = new HashMap<>();
+ Metadata metadata = session.getCluster().getMetadata();
+ for (TokenRange tokenRange : metadata.getTokenRanges())
+ map.put(tokenRange, metadata.getReplicas(keyspace, tokenRange));
+ return map;
}
- private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
{
- Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
- List<TokenRange> map;
- try
+ String query = String.format("SELECT mean_partition_size, partitions_count " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+ SystemKeyspace.NAME,
+ SystemKeyspace.SIZE_ESTIMATES);
+
+ ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+
+ Row row = resultSet.one();
+ // If we have no data on this split, return the full split i.e., do not sub-split
+ // Assume smallest granularity of partition count available from CASSANDRA-7688
+ if (row == null)
{
- map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
+ Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
+ wrappedTokenRange.put(tokenRange, (long) 128);
+ return wrappedTokenRange;
}
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
-
- return map;
+
+ long meanPartitionSize = row.getLong("mean_partition_size");
+ long partitionCount = row.getLong("partitions_count");
+
+ int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+ List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
+ Map<TokenRange, Long> rangesWithLength = new HashMap<>();
+ for (TokenRange range : splitRanges)
+ rangesWithLength.put(range, partitionCount/splitCount);
+
+ return rangesWithLength;
}
- //
// Old Hadoop API
- //
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
{
TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
deleted file mode 100644
index 03d0045..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
- * OutputFormat that allows reduce tasks to store keys (and corresponding
- * values) as Cassandra rows (and respective columns) in a given
- * ColumnFamily.
- *
- * <p>
- * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
- * Keyspace and ColumnFamily in your
- * Hadoop job Configuration. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
- * simple.
- * </p>
- *
- * <p>
- * For the sake of performance, this class employs a lazy write-back caching
- * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
- * </p>
- * @param <Y>
- */
-public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y>
-{
- public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
- public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
- private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
-
-
- /**
- * Check for validity of the output-specification for the job.
- *
- * @param context
- * information about the job
- */
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- protected void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
- if (ConfigHelper.getOutputPartitioner(conf) == null)
- throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
- if (ConfigHelper.getOutputInitialAddress(conf) == null)
- throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
- /**
- * The OutputCommitter for this format does not write any data to the DFS.
- *
- * @param context
- * the task context
- * @return an output committer
- * @throws IOException
- * @throws InterruptedException
- */
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /**
- * Connects to the given server:port and returns a client based on the given socket that points to the configured
- * keyspace, and is logged in with the configured credentials.
- *
- * @param host fully qualified host name to connect to
- * @param port RPC port of the server
- * @param conf a job configuration
- * @return a cassandra client
- * @throws Exception set of thrown exceptions may be implementation defined,
- * depending on the used transport factory
- */
- public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
- {
- logger.debug("Creating authenticated client for CF output format");
- TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
- TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
- client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
- String user = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
- if ((user != null) && (password != null))
- login(user, password, client);
-
- logger.debug("Authenticated client for CF output format created successfully");
- return client;
- }
-
- public static void login(String user, String password, Cassandra.Client client) throws Exception
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(PasswordAuthenticator.USERNAME_KEY, user);
- creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- /**
- * An {@link OutputCommitter} that does nothing.
- */
- private static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
deleted file mode 100644
index cb44beb..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.transport.TTransport;
-import org.apache.hadoop.util.Progressable;
-
-
-/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
- *
- * <p>
- * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
- * </p>
- *
- * @see ColumnFamilyOutputFormat
- */
-public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y>
-{
- // The configuration this writer is associated with.
- protected final Configuration conf;
-
- // The ring cache that describes the token ranges each node in the ring is
- // responsible for. This is what allows us to group the mutations by
- // the endpoints they should be targeted at. The targeted endpoint
- // essentially
- // acts as the primary replica for the rows being affected by the mutations.
- protected final RingCache ringCache;
-
- // The number of mutations to buffer per endpoint
- protected final int queueSize;
-
- protected final long batchThreshold;
-
- protected final ConsistencyLevel consistencyLevel;
- protected Progressable progressable;
- protected TaskAttemptContext context;
-
- protected AbstractColumnFamilyRecordWriter(Configuration conf)
- {
- this.conf = conf;
- this.ringCache = new RingCache(conf);
- this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
- batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
- consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
- }
-
- /**
- * Close this <code>RecordWriter</code> to future operations, but not before
- * flushing out the batched mutations.
- *
- * @param context the context of the task
- * @throws IOException
- */
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- protected abstract void close() throws IOException;
-
- /**
- * A client that runs in a threadpool and connects to the list of endpoints for a particular
- * range. Mutations for keys in that range are sent to this client via a queue.
- */
- public abstract class AbstractRangeClient<K> extends Thread
- {
- // The list of endpoints for this range
- protected final List<InetAddress> endpoints;
- // A bounded queue of incoming mutations for this range
- protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
-
- protected volatile boolean run = true;
- // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
- // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
- // when the client is closed.
- protected volatile IOException lastException;
-
- protected Cassandra.Client client;
-
- /**
- * Constructs an {@link AbstractRangeClient} for the given endpoints.
- * @param endpoints the possible endpoints to execute the mutations on
- */
- public AbstractRangeClient(List<InetAddress> endpoints)
- {
- super("client-" + endpoints);
- this.endpoints = endpoints;
- }
-
- /**
- * enqueues the given value to Cassandra
- */
- public void put(K value) throws IOException
- {
- while (true)
- {
- if (lastException != null)
- throw lastException;
- try
- {
- if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
- break;
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- }
-
- public void close() throws IOException
- {
- // stop the run loop. this will result in closeInternal being called by the time join() finishes.
- run = false;
- interrupt();
- try
- {
- this.join();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- if (lastException != null)
- throw lastException;
- }
-
- protected void closeInternal()
- {
- if (client != null)
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- }
- }
-
- /**
- * Loops collecting mutations from the queue and sending to Cassandra
- */
- public abstract void run();
-
- @Override
- public String toString()
- {
- return "#<Client for " + endpoints + ">";
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index f5a5a8d..5282279 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,9 +23,12 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.thrift.Mutation;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+ implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
{
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
@@ -39,4 +42,50 @@ public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<M
{
return new BulkRecordWriter(context);
}
+
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d67b856..6b9ecb5 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -17,24 +17,57 @@
*/
package org.apache.cassandra.hadoop;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
-public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+public final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>>
+ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
private File outputDir;
@@ -55,17 +88,32 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
BulkRecordWriter(TaskAttemptContext context)
{
- super(context);
+
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
}
BulkRecordWriter(Configuration conf, Progressable progress)
{
- super(conf, progress);
+ this(conf);
+ this.progress = progress;
}
BulkRecordWriter(Configuration conf)
{
- super(conf);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+ }
+
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
}
private void setTypes(Mutation mutation)
@@ -115,6 +163,54 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
}
@Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ if (writer != null)
+ {
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
+ {
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+ }
+ }
+ }
+
+ @Override
public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
{
setTypes(value.get(0));
@@ -158,4 +254,43 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
HadoopCompat.progress(context);
}
}
+
+ public static class ExternalClient extends NativeSSTableLoaderClient
+ {
+ public ExternalClient(Configuration conf)
+ {
+ super(resolveHostAddresses(conf),
+ CqlConfigHelper.getOutputNativePort(conf),
+ ConfigHelper.getOutputKeyspaceUserName(conf),
+ ConfigHelper.getOutputKeyspacePassword(conf),
+ CqlConfigHelper.getSSLOptions(conf).orNull());
+ }
+
+ private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+ {
+ Set<InetAddress> addresses = new HashSet<>();
+
+ for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
+ {
+ try
+ {
+ addresses.add(InetAddress.getByName(host));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return addresses;
+ }
+ }
+
+ public static class NullOutputHandler implements OutputHandler
+ {
+ public void output(String msg) {}
+ public void debug(String msg) {}
+ public void warn(String msg) {}
+ public void warn(String msg, Throwable th) {}
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 686d486..88dd2e2 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,11 +21,23 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -44,9 +56,40 @@ import org.apache.hadoop.mapreduce.*;
*
* The default split size is 64k rows.
*/
+@Deprecated
public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
{
-
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
+ public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
+ {
+ logger.debug("Creating authenticated client for CF input format");
+ TTransport transport;
+ try
+ {
+ transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
+ }
+ catch (Exception e)
+ {
+ throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
+ }
+ TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+
+ // log in
+ client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
+ if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+ creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+ logger.debug("Authenticated client for CF input format created successfully");
+ return client;
+ }
+
public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 2990bf3..94ced69 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -18,10 +18,18 @@
package org.apache.cassandra.hadoop;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+
+import org.slf4j.*;
+
+import org.apache.cassandra.auth.*;
import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
/**
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
@@ -44,8 +52,93 @@ import org.apache.hadoop.mapreduce.*;
* official by sending a batch mutate request to Cassandra.
* </p>
*/
-public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+ implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
{
+ public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+ public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
+
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
+
+ /**
+ * The OutputCommitter for this format does not write any data to the DFS.
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /**
+ * Check for validity of the output-specification for the job.
+ *
+ * @param context
+ * information about the job
+ */
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ protected void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ /**
+ * Connects to the given server:port and returns a client based on the given socket that points to the configured
+ * keyspace, and is logged in with the configured credentials.
+ *
+ * @param host fully qualified host name to connect to
+ * @param port RPC port of the server
+ * @param conf a job configuration
+ * @return a cassandra client
+ * @throws Exception set of thrown exceptions may be implementation defined,
+ * depending on the used transport factory
+ */
+ public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
+ {
+ logger.debug("Creating authenticated client for CF output format");
+ TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
+ TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+ String user = ConfigHelper.getOutputKeyspaceUserName(conf);
+ String password = ConfigHelper.getOutputKeyspacePassword(conf);
+ if ((user != null) && (password != null))
+ login(user, password, client);
+
+ logger.debug("Authenticated client for CF output format created successfully");
+ return client;
+ }
+
+ public static void login(String user, String password, Cassandra.Client client) throws Exception
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(PasswordAuthenticator.USERNAME_KEY, user);
+ creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
@@ -64,4 +157,26 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B
{
return new ColumnFamilyRecordWriter(context);
}
+
+ /**
+ * An {@link OutputCommitter} that does nothing.
+ */
+ private static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 35437e9..d205f13 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
+@Deprecated
public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index d6a873b..31c7047 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -22,15 +22,19 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.*;
+import org.apache.cassandra.client.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TException;
import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.transport.*;
/**
@@ -47,10 +51,30 @@ import org.apache.hadoop.util.Progressable;
*
* @see ColumnFamilyOutputFormat
*/
-final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements
+ org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
{
+ // The configuration this writer is associated with.
+ protected final Configuration conf;
+
+ // The number of mutations to buffer per endpoint
+ protected final int queueSize;
+
+ protected final long batchThreshold;
+
+ protected final ConsistencyLevel consistencyLevel;
+ protected Progressable progressable;
+ protected TaskAttemptContext context;
// handles for clients for each range running in the threadpool
private final Map<Range, RangeClient> clients;
+
+ // The ring cache that describes the token ranges each node in the ring is
+ // responsible for. This is what allows us to group the mutations by
+ // the endpoints they should be targeted at. The targeted endpoint
+ // essentially
+ // acts as the primary replica for the rows being affected by the mutations.
+ private final RingCache ringCache;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -73,11 +97,33 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
ColumnFamilyRecordWriter(Configuration conf)
{
- super(conf);
+ this.conf = conf;
+ this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+ batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+ consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
+ this.ringCache = new RingCache(conf);
this.clients = new HashMap<Range, RangeClient>();
}
-
- @Override
+
+ /**
+ * Close this <code>RecordWriter</code> to future operations, but not before
+ * flushing out the batched mutations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ */
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
public void close() throws IOException
{
// close all the clients before throwing anything
@@ -138,8 +184,20 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Mutations for keys in that range are sent to this client via a queue.
*/
- public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
+ public class RangeClient extends Thread
{
+ // The list of endpoints for this range
+ protected final List<InetAddress> endpoints;
+ // A bounded queue of incoming mutations for this range
+ protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize);
+
+ protected volatile boolean run = true;
+ // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+ // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+ // when the client is closed.
+ protected volatile IOException lastException;
+
+ protected Cassandra.Client client;
public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
/**
@@ -148,8 +206,58 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
*/
public RangeClient(List<InetAddress> endpoints)
{
- super(endpoints);
+ super("client-" + endpoints);
+ this.endpoints = endpoints;
}
+
+ /**
+ * enqueues the given value to Cassandra
+ */
+ public void put(Pair<ByteBuffer, Mutation> value) throws IOException
+ {
+ while (true)
+ {
+ if (lastException != null)
+ throw lastException;
+ try
+ {
+ if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ public void close() throws IOException
+ {
+ // stop the run loop. this will result in closeInternal being called by the time join() finishes.
+ run = false;
+ interrupt();
+ try
+ {
+ this.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (lastException != null)
+ throw lastException;
+ }
+
+ protected void closeInternal()
+ {
+ if (client != null)
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ }
+ }
/**
* Loops collecting mutations from the queue and sending to Cassandra
[3/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 78080e2..3899f8c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,11 +22,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
@@ -35,7 +38,7 @@ import org.apache.hadoop.util.Progressable;
* The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
* bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
*
* <p>
* As is the case with the {@link org.apache.cassandra.hadoop.cql3.CqlOutputFormat},
@@ -48,13 +51,14 @@ import org.apache.hadoop.util.Progressable;
* simple.
* </p>
*/
-public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<ByteBuffer>>
+public class CqlBulkOutputFormat extends OutputFormat<Object, List<ByteBuffer>>
+ implements org.apache.hadoop.mapred.OutputFormat<Object, List<ByteBuffer>>
{
- private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
- private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
+ private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.table.schema.";
+ private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.table.insert.";
private static final String DELETE_SOURCE = "cassandra.output.delete.source";
- private static final String COLUMNFAMILY_ALIAS_PREFIX = "cqlbulkoutputformat.columnfamily.alias.";
+ private static final String TABLE_ALIAS_PREFIX = "cqlbulkoutputformat.table.alias.";
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
@@ -75,33 +79,60 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
{
return new CqlBulkRecordWriter(context);
}
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setTable()");
+ }
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
- public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema)
+ public static void setTableSchema(Configuration conf, String columnFamily, String schema)
{
conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
}
- public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement)
+ public static void setTableInsertStatement(Configuration conf, String columnFamily, String insertStatement)
{
conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
}
- public static String getColumnFamilySchema(Configuration conf, String columnFamily)
+ public static String getTableSchema(Configuration conf, String columnFamily)
{
String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
if (schema == null)
{
- throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema.");
+ throw new UnsupportedOperationException("You must set the Table schema using setTableSchema.");
}
return schema;
}
- public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily)
+ public static String getTableInsertStatement(Configuration conf, String columnFamily)
{
String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily);
if (insert == null)
{
- throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema.");
+ throw new UnsupportedOperationException("You must set the Table insert statement using setTableSchema.");
}
return insert;
}
@@ -116,13 +147,31 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
return conf.getBoolean(DELETE_SOURCE, false);
}
- public static void setColumnFamilyAlias(Configuration conf, String alias, String columnFamily)
+ public static void setTableAlias(Configuration conf, String alias, String columnFamily)
{
- conf.set(COLUMNFAMILY_ALIAS_PREFIX + alias, columnFamily);
+ conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily);
}
- public static String getColumnFamilyForAlias(Configuration conf, String alias)
+ public static String getTableForAlias(Configuration conf, String alias)
{
- return conf.get(COLUMNFAMILY_ALIAS_PREFIX + alias);
+ return conf.get(TABLE_ALIAS_PREFIX + alias);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 60cd511..e77c4c8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -17,17 +17,22 @@
*/
package org.apache.cassandra.hadoop.cql3;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
import org.apache.cassandra.hadoop.BulkRecordWriter;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
@@ -35,11 +40,12 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
-
/**
* The <code>CqlBulkRecordWriter</code> maps the output <key, value>
* pairs to a Cassandra column family. In particular, it applies the binded variables
@@ -54,10 +60,26 @@ import org.apache.hadoop.util.Progressable;
*
* @see CqlBulkOutputFormat
*/
-public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>>
+public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
+ implements org.apache.hadoop.mapred.RecordWriter<Object, List<ByteBuffer>>
{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(CqlBulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
+
private String keyspace;
- private String columnFamily;
+ private String table;
private String schema;
private String insertStatement;
private File outputDir;
@@ -65,19 +87,25 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
{
- super(context);
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
setConfigs();
}
CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException
{
- super(conf, progress);
+ this(conf);
+ this.progress = progress;
setConfigs();
}
CqlBulkRecordWriter(Configuration conf) throws IOException
{
- super(conf);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
setConfigs();
}
@@ -85,54 +113,55 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
{
// if anything is missing, exceptions will be thrown here, instead of on write()
keyspace = ConfigHelper.getOutputKeyspace(conf);
- columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+ table = ConfigHelper.getOutputColumnFamily(conf);
- // check if columnFamily is aliased
- String aliasedCf = CqlBulkOutputFormat.getColumnFamilyForAlias(conf, columnFamily);
+ // check if table is aliased
+ String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
if (aliasedCf != null)
- columnFamily = aliasedCf;
+ table = aliasedCf;
- schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
- insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
- outputDir = getColumnFamilyDirectory();
+ schema = CqlBulkOutputFormat.getTableSchema(conf, table);
+ insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table);
+ outputDir = getTableDirectory();
deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
}
-
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
+ }
+
private void prepareWriter() throws IOException
{
- try
+ if (writer == null)
{
- if (writer == null)
- {
- writer = CQLSSTableWriter.builder()
- .forTable(schema)
- .using(insertStatement)
- .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
- .inDirectory(outputDir)
- .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
- .build();
- }
- if (loader == null)
- {
- ExternalClient externalClient = new ExternalClient(conf);
-
- externalClient.addKnownCfs(keyspace, schema);
-
- this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
- @Override
- public void onSuccess(StreamState finalState)
- {
- if (deleteSrc)
- FileUtils.deleteRecursive(outputDir);
- }
- };
- }
+ writer = CQLSSTableWriter.builder()
+ .forTable(schema)
+ .using(insertStatement)
+ .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+ .inDirectory(outputDir)
+ .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
+ .build();
}
- catch (Exception e)
+
+ if (loader == null)
{
- throw new IOException(e);
- }
+ ExternalClient externalClient = new ExternalClient(conf);
+ externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
+
+ loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
+ {
+ @Override
+ public void onSuccess(StreamState finalState)
+ {
+ if (deleteSrc)
+ FileUtils.deleteRecursive(outputDir);
+ }
+ };
+ }
}
/**
@@ -168,9 +197,9 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
}
}
- private File getColumnFamilyDirectory() throws IOException
+ private File getTableDirectory() throws IOException
{
- File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily, UUID.randomUUID().toString()));
+ File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, table, UUID.randomUUID().toString()));
if (!dir.exists() && !dir.mkdirs())
{
@@ -179,41 +208,83 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
return dir;
}
-
- public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
- private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
-
- public ExternalClient(Configuration conf)
- {
- super(conf);
- }
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
- public void addKnownCfs(String keyspace, String cql)
+ private void close() throws IOException
+ {
+ if (writer != null)
{
- Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-
- if (cfs == null)
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
{
- cfs = new HashMap<>();
- knownCqlCfs.put(keyspace, cfs);
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
}
-
- CFMetaData metadata = CFMetaData.compile(cql, keyspace);
- cfs.put(metadata.cfName, metadata);
}
-
- @Override
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ }
+
+ public static class ExternalClient extends NativeSSTableLoaderClient
+ {
+ public ExternalClient(Configuration conf)
{
- CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
- if (metadata != null)
+ super(resolveHostAddresses(conf),
+ CqlConfigHelper.getOutputNativePort(conf),
+ ConfigHelper.getOutputKeyspaceUserName(conf),
+ ConfigHelper.getOutputKeyspacePassword(conf),
+ CqlConfigHelper.getSSLOptions(conf).orNull());
+ }
+
+ private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+ {
+ Set<InetAddress> addresses = new HashSet<>();
+
+ for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
{
- return metadata;
+ try
+ {
+ addresses.add(InetAddress.getByName(host));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
}
-
- Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
+
+ return addresses;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index ac5a7e5..3033fa6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -34,22 +34,23 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.io.util.FileUtils;
+import com.google.common.base.Optional;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.google.common.base.Optional;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
public class CqlConfigHelper
{
@@ -84,6 +85,7 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
private static final String OUTPUT_CQL = "cassandra.output.cql";
+ private static final String OUTPUT_NATIVE_PORT = "cassandra.output.native.port";
/**
* Set the CQL columns for the input of this job.
@@ -176,6 +178,11 @@ public class CqlConfigHelper
return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
}
+ public static int getOutputNativePort(Configuration conf)
+ {
+ return Integer.parseInt(conf.get(OUTPUT_NATIVE_PORT, "9042"));
+ }
+
public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf)
{
return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
@@ -294,6 +301,22 @@ public class CqlConfigHelper
public static Cluster getInputCluster(String[] hosts, Configuration conf)
{
int port = getInputNativePort(conf);
+ return getCluster(hosts, conf, port);
+ }
+
+ public static Cluster getOutputCluster(String host, Configuration conf)
+ {
+ return getOutputCluster(new String[]{host}, conf);
+ }
+
+ public static Cluster getOutputCluster(String[] hosts, Configuration conf)
+ {
+ int port = getOutputNativePort(conf);
+ return getCluster(hosts, conf, port);
+ }
+
+ public static Cluster getCluster(String[] hosts, Configuration conf, int port)
+ {
Optional<AuthProvider> authProvider = getAuthProvider(conf);
Optional<SSLOptions> sslOptions = getSSLOptions(conf);
Optional<Integer> protocolVersion = getProtocolVersion(conf);
@@ -301,11 +324,11 @@ public class CqlConfigHelper
SocketOptions socketOptions = getReadSocketOptions(conf);
QueryOptions queryOptions = getReadQueryOptions(conf);
PoolingOptions poolingOptions = getReadPoolingOptions(conf);
-
+
Cluster.Builder builder = Cluster.builder()
- .addContactPoints(hosts)
- .withPort(port)
- .withCompression(ProtocolOptions.Compression.NONE);
+ .addContactPoints(hosts)
+ .withPort(port)
+ .withCompression(ProtocolOptions.Compression.NONE);
if (authProvider.isPresent())
builder.withAuthProvider(authProvider.get());
@@ -316,14 +339,13 @@ public class CqlConfigHelper
builder.withProtocolVersion(protocolVersion.get());
}
builder.withLoadBalancingPolicy(loadBalancingPolicy)
- .withSocketOptions(socketOptions)
- .withQueryOptions(queryOptions)
- .withPoolingOptions(poolingOptions);
+ .withSocketOptions(socketOptions)
+ .withQueryOptions(queryOptions)
+ .withPoolingOptions(poolingOptions);
return builder.build();
}
-
public static void setInputCoreConnections(Configuration conf, String connections)
{
conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
@@ -502,7 +524,7 @@ public class CqlConfigHelper
return Optional.of(getClientAuthProvider(authProvider.get(), conf));
}
- private static Optional<SSLOptions> getSSLOptions(Configuration conf)
+ public static Optional<SSLOptions> getSSLOptions(Configuration conf)
{
Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf);
Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 0d09ca2..9a1cda6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -23,15 +23,15 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * The <code>CqlOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
* bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
*
* <p>
* As is the case with the {@link org.apache.cassandra.hadoop.ColumnFamilyInputFormat},
@@ -52,8 +52,51 @@ import org.apache.hadoop.mapreduce.*;
* to Cassandra.
* </p>
*/
-public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
-{
+public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+ implements org.apache.hadoop.mapred.OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+{
+ /**
+ * Check for validity of the output-specification for the job.
+ *
+ * @param context
+ * information about the job
+ */
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ protected void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ /**
+ * The OutputCommitter for this format does not write any data to the DFS.
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -73,4 +116,25 @@ public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String
{
return new CqlRecordWriter(context);
}
+
+ /**
+ * An {@link OutputCommitter} that does nothing.
+ */
+ private static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 308bdf8..4a7bd59 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -37,13 +37,15 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.UDTValue;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+import com.google.common.reflect.TypeToken;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.hadoop.ColumnFamilySplit;
@@ -493,36 +495,72 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
@Override
+ public <T> List<T> getList(int i, TypeToken<T> typeToken)
+ {
+ return row.getList(i, typeToken);
+ }
+
+ @Override
public <T> List<T> getList(String name, Class<T> elementsClass)
{
return row.getList(name, elementsClass);
}
@Override
+ public <T> List<T> getList(String s, TypeToken<T> typeToken)
+ {
+ return row.getList(s, typeToken);
+ }
+
+ @Override
public <T> Set<T> getSet(int i, Class<T> elementsClass)
{
return row.getSet(i, elementsClass);
}
@Override
+ public <T> Set<T> getSet(int i, TypeToken<T> typeToken)
+ {
+ return row.getSet(i, typeToken);
+ }
+
+ @Override
public <T> Set<T> getSet(String name, Class<T> elementsClass)
{
return row.getSet(name, elementsClass);
}
@Override
+ public <T> Set<T> getSet(String s, TypeToken<T> typeToken)
+ {
+ return row.getSet(s, typeToken);
+ }
+
+ @Override
public <K, V> Map<K, V> getMap(int i, Class<K> keysClass, Class<V> valuesClass)
{
return row.getMap(i, keysClass, valuesClass);
}
@Override
+ public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1)
+ {
+ return row.getMap(i, typeToken, typeToken1);
+ }
+
+ @Override
public <K, V> Map<K, V> getMap(String name, Class<K> keysClass, Class<V> valuesClass)
{
return row.getMap(name, keysClass, valuesClass);
}
@Override
+ public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1)
+ {
+ return row.getMap(s, typeToken, typeToken1);
+ }
+
+ @Override
public UDTValue getUDTValue(int i)
{
return row.getUDTValue(i);
@@ -545,6 +583,24 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
return row.getTupleValue(name);
}
+
+ @Override
+ public Token getToken(int i)
+ {
+ return row.getToken(i);
+ }
+
+ @Override
+ public Token getToken(String name)
+ {
+ return row.getToken(name);
+ }
+
+ @Override
+ public Token getPartitionKeyToken()
+ {
+ return row.getPartitionKeyToken();
+ }
}
/**
@@ -604,36 +660,21 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private void fetchKeys()
{
- String query = String.format("SELECT column_name, component_index, type " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNS,
- keyspace,
- cfName);
-
// get CF meta data
- List<Row> rows = session.execute(query).all();
- if (rows.isEmpty())
+ TableMetadata tableMetadata = session.getCluster()
+ .getMetadata()
+ .getKeyspace(Metadata.quote(keyspace))
+ .getTable(Metadata.quote(cfName));
+ if (tableMetadata == null)
{
throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
}
- int numberOfPartitionKeys = 0;
- for (Row row : rows)
- if (row.getString(2).equals("partition_key"))
- numberOfPartitionKeys++;
- String[] partitionKeyArray = new String[numberOfPartitionKeys];
- for (Row row : rows)
+ //Here we assume that tableMetadata.getPartitionKey() always
+ //returns the list of columns in order of component_index
+ for (ColumnMetadata partitionKey : tableMetadata.getPartitionKey())
{
- String type = row.getString(2);
- String column = row.getString(0);
- if (type.equals("partition_key"))
- {
- int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
- partitionKeyArray[componentIndex] = column;
- }
+ partitionKeys.add(partitionKey.getName());
}
- partitionKeys.addAll(Arrays.asList(partitionKeyArray));
}
private String quote(String identifier)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dbbeb47..1d8436b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -21,37 +21,39 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TokenRange;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.utils.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Progressable;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * The <code>CqlRecordWriter</code> maps the output <key, value>
+ * pairs to a Cassandra table. In particular, it applies the binded variables
* in the value to the prepared statement, which it associates with the key, and in
* turn the responsible endpoint.
*
@@ -63,21 +65,38 @@ import org.apache.thrift.transport.TTransport;
*
* @see CqlOutputFormat
*/
-class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements AutoCloseable
+class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements
+ org.apache.hadoop.mapred.RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>, AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordWriter.class);
+ // The configuration this writer is associated with.
+ protected final Configuration conf;
+ // The number of mutations to buffer per endpoint
+ protected final int queueSize;
+
+ protected final long batchThreshold;
+
+ protected Progressable progressable;
+ protected TaskAttemptContext context;
+
+ // The ring cache that describes the token ranges each node in the ring is
+ // responsible for. This is what allows us to group the mutations by
+ // the endpoints they should be targeted at. The targeted endpoint
+ // essentially
+ // acts as the primary replica for the rows being affected by the mutations.
+ private final NativeRingCache ringCache;
+
// handles for clients for each range running in the threadpool
protected final Map<InetAddress, RangeClient> clients;
// host to prepared statement id mappings
- protected final ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
+ protected final ConcurrentHashMap<Session, PreparedStatement> preparedStatements = new ConcurrentHashMap<Session, PreparedStatement>();
protected final String cql;
- protected AbstractType<?> keyValidator;
- protected String [] partitionKeyColumns;
- protected List<String> clusterColumns;
+ protected List<ColumnMetadata> partitionKeyColumns;
+ protected List<ColumnMetadata> clusterColumns;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -100,28 +119,28 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
CqlRecordWriter(Configuration conf)
{
- super(conf);
+ this.conf = conf;
+ this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+ batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
this.clients = new HashMap<>();
try
{
- Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
+ ringCache = new NativeRingCache(conf);
if (client != null)
{
- client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
- String user = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
- if ((user != null) && (password != null))
- AbstractColumnFamilyOutputFormat.login(user, password, client);
- retrievePartitionKeyValidator(client);
+ TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
if (cqlQuery.toLowerCase().startsWith("insert"))
throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
cql = appendKeyWhereClauses(cqlQuery);
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
+ client.close();
}
else
{
@@ -133,7 +152,26 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
throw new RuntimeException(e);
}
}
-
+
+ /**
+ * Close this <code>RecordWriter</code> to future operations, but not before
+ * flushing out the batched mutations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ */
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
@Override
public void close() throws IOException
{
@@ -157,7 +195,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
/**
* If the key is to be associated with a valid value, a mutation is created
- * for it with the given column family and columns. In the event the value
+ * for it with the given table and columns. In the event the value
* in the column is missing (i.e., null), then it is marked for
* {@link Deletion}. Similarly, if the entire value for a key is missing
* (i.e., null), then the entire key is marked for {@link Deletion}.
@@ -172,25 +210,25 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
@Override
public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
{
- Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
+ TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
- final InetAddress address = ringCache.getEndpoint(range).get(0);
+ final InetAddress address = ringCache.getEndpoints(range).get(0);
RangeClient client = clients.get(address);
if (client == null)
{
// haven't seen keys for this range: create new client
- client = new RangeClient(ringCache.getEndpoint(range));
+ client = new RangeClient(ringCache.getEndpoints(range));
client.start();
clients.put(address, client);
}
// add primary key columns to the bind variables
List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
- for (String column : partitionKeyColumns)
- allValues.add(keyColumns.get(column));
- for (String column : clusterColumns)
- allValues.add(keyColumns.get(column));
+ for (ColumnMetadata column : partitionKeyColumns)
+ allValues.add(keyColumns.get(column.getName()));
+ for (ColumnMetadata column : clusterColumns)
+ allValues.add(keyColumns.get(column.getName()));
client.put(allValues);
@@ -204,16 +242,50 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Bound variables for keys in that range are sent to this client via a queue.
*/
- public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
+ public class RangeClient extends Thread
{
+ // The list of endpoints for this range
+ protected final List<InetAddress> endpoints;
+ protected Session client;
+ // A bounded queue of incoming mutations for this range
+ protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize);
+
+ protected volatile boolean run = true;
+ // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+ // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+ // when the client is closed.
+ protected volatile IOException lastException;
+
/**
* Constructs an {@link RangeClient} for the given endpoints.
* @param endpoints the possible endpoints to execute the mutations on
*/
public RangeClient(List<InetAddress> endpoints)
{
- super(endpoints);
- }
+ super("client-" + endpoints);
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * enqueues the given value to Cassandra
+ */
+ public void put(List<ByteBuffer> value) throws IOException
+ {
+ while (true)
+ {
+ if (lastException != null)
+ throw lastException;
+ try
+ {
+ if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
/**
* Loops collecting cql binded variable values from the queue and sending to Cassandra
@@ -234,156 +306,138 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
continue;
}
- Iterator<InetAddress> iter = endpoints.iterator();
+ ListIterator<InetAddress> iter = endpoints.listIterator();
while (true)
{
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
+
+ // attempt to connect to a different endpoint
try
{
- int i = 0;
- int itemId = preparedStatement(client);
- while (bindVariables != null)
- {
- client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
- i++;
-
- if (i >= batchThreshold)
- break;
-
- bindVariables = queue.poll();
- }
-
- break;
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ client = CqlConfigHelper.getOutputCluster(host, conf).connect();
}
catch (Exception e)
{
+ //If connection died due to Interrupt, just try connecting to the endpoint again.
+ if (Thread.interrupted()) {
+ lastException = new IOException(e);
+ iter.previous();
+ }
closeInternal();
- if (!iter.hasNext())
+
+ // Most exceptions mean something unexpected went wrong to that endpoint, so
+ // we should try again to another. Other exceptions (auth or invalid request) are fatal.
+ if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
{
lastException = new IOException(e);
break outer;
}
}
- // attempt to connect to a different endpoint
try
{
- InetAddress address = iter.next();
- String host = address.getHostName();
- int port = ConfigHelper.getOutputRpcPort(conf);
- client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+ int i = 0;
+ PreparedStatement statement = preparedStatement(client);
+ while (bindVariables != null)
+ {
+ BoundStatement boundStatement = new BoundStatement(statement);
+ for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ {
+ boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ }
+ client.execute(boundStatement);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+ bindVariables = queue.poll();
+ }
+ break;
}
catch (Exception e)
{
closeInternal();
- // TException means something unexpected went wrong to that endpoint, so
- // we should try again to another. Other exceptions (auth or invalid request) are fatal.
- if ((!(e instanceof TException)) || !iter.hasNext())
+ if (!iter.hasNext())
{
lastException = new IOException(e);
break outer;
}
}
+
}
}
-
// close all our connections once we are done.
closeInternal();
}
/** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
- private int preparedStatement(Cassandra.Client client)
+ private PreparedStatement preparedStatement(Session client)
{
- Integer itemId = preparedStatements.get(client);
- if (itemId == null)
+ PreparedStatement statement = preparedStatements.get(client);
+ if (statement == null)
{
- CqlPreparedResult result;
+ PreparedStatement result;
try
{
- result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
+ result = client.prepare(cql);
}
- catch (TException e)
+ catch (NoHostAvailableException e)
{
throw new RuntimeException("failed to prepare cql query " + cql, e);
}
- Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
- itemId = previousId == null ? result.itemId : previousId;
+ PreparedStatement previousId = preparedStatements.putIfAbsent(client, result);
+ statement = previousId == null ? result : previousId;
}
- return itemId;
+ return statement;
}
- }
- private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
- {
- ByteBuffer partitionKey;
- if (keyValidator instanceof CompositeType)
+ public void close() throws IOException
{
- ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
- for (int i = 0; i< keys.length; i++)
- keys[i] = keyColumns.get(partitionKeyColumns[i]);
+ // stop the run loop. this will result in closeInternal being called by the time join() finishes.
+ run = false;
+ interrupt();
+ try
+ {
+ this.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
- partitionKey = CompositeType.build(keys);
- }
- else
- {
- partitionKey = keyColumns.get(partitionKeyColumns[0]);
+ if (lastException != null)
+ throw lastException;
}
- return partitionKey;
- }
- // FIXME
- /** retrieve the key validator from system.schema_columnfamilies table */
- private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
- {
- String keyspace = ConfigHelper.getOutputKeyspace(conf);
- String cfName = ConfigHelper.getOutputColumnFamily(conf);
- String query = String.format("SELECT key_validator, key_aliases, column_aliases " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' and columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNFAMILIES,
- keyspace,
- cfName);
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
- Column rawKeyValidator = result.rows.get(0).columns.get(0);
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
- keyValidator = parseType(validator);
-
- Column rawPartitionKeys = result.rows.get(0).columns.get(1);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
- logger.debug("partition keys: {}", keyString);
-
- List<String> keys = FBUtilities.fromJsonList(keyString);
- partitionKeyColumns = new String[keys.size()];
- int i = 0;
- for (String key : keys)
+
+ protected void closeInternal()
{
- partitionKeyColumns[i] = key;
- i++;
+ if (client != null)
+ {
+ client.close();;
+ }
}
-
- Column rawClusterColumns = result.rows.get(0).columns.get(2);
- String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
-
- logger.debug("cluster columns: {}", clusterColumnString);
- clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
}
- private AbstractType<?> parseType(String type) throws ConfigurationException
+ private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
{
- try
+ ByteBuffer partitionKey;
+ if (partitionKeyColumns.size() > 1)
{
- // always treat counters like longs, specifically CCT.serialize is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
+ ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.size()];
+ for (int i = 0; i< keys.length; i++)
+ keys[i] = keyColumns.get(partitionKeyColumns.get(i).getName());
+
+ partitionKey = CompositeType.build(keys);
}
- catch (SyntaxException e)
+ else
{
- throw new ConfigurationException(e.getMessage(), e);
+ partitionKey = keyColumns.get(partitionKeyColumns.get(0).getName());
}
+ return partitionKey;
}
/**
@@ -393,10 +447,10 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
{
String keyWhereClause = "";
- for (String partitionKey : partitionKeyColumns)
- keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey) : (" AND " + quote(partitionKey)));
- for (String clusterColumn : clusterColumns)
- keyWhereClause += " AND " + quote(clusterColumn) + " = ?";
+ for (ColumnMetadata partitionKey : partitionKeyColumns)
+ keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey.getName()) : (" AND " + quote(partitionKey.getName())));
+ for (ColumnMetadata clusterColumn : clusterColumns)
+ keyWhereClause += " AND " + quote(clusterColumn.getName()) + " = ?";
return cqlQuery + " WHERE " + keyWhereClause;
}
@@ -406,4 +460,60 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
}
+
+ class NativeRingCache
+ {
+ private Map<TokenRange, Set<Host>> rangeMap;
+ private Metadata metadata;
+ private final IPartitioner partitioner;
+ private final Configuration conf;
+
+ public NativeRingCache(Configuration conf)
+ {
+ this.conf = conf;
+ this.partitioner = ConfigHelper.getOutputPartitioner(conf);
+ refreshEndpointMap();
+ }
+
+
+ private void refreshEndpointMap()
+ {
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
+ rangeMap = new HashMap<>();
+ metadata = session.getCluster().getMetadata();
+ Set<TokenRange> ranges = metadata.getTokenRanges();
+ for (TokenRange range : ranges)
+ {
+ rangeMap.put(range, metadata.getReplicas(keyspace, range));
+ }
+ }
+
+ public TokenRange getRange(ByteBuffer key)
+ {
+ Token t = partitioner.getToken(key);
+ com.datastax.driver.core.Token driverToken = metadata.newToken(partitioner.getTokenFactory().toString(t));
+ for (TokenRange range : rangeMap.keySet())
+ {
+ if (range.contains(driverToken))
+ {
+ return range;
+ }
+ }
+
+ throw new RuntimeException("Invalid token information returned by describe_ring: " + rangeMap);
+ }
+
+ public List<InetAddress> getEndpoints(TokenRange range)
+ {
+ Set<Host> hostSet = rangeMap.get(range);
+ List<Host> hosts = Arrays.asList(rangeMap.get(range).toArray(new Host[rangeMap.get(range).size()]));
+ List<InetAddress> addresses = new ArrayList<>(hosts.size());
+ for (Host host: hosts)
+ {
+ addresses.add(host.getAddress());
+ }
+ return addresses;
+ }
+ }
}
[2/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0a64c87..1ad80b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -18,30 +18,46 @@
package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.db.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.auth.PasswordAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
@@ -52,7 +68,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
*
* A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
*/
-public class CassandraStorage extends AbstractCassandraStorage
+@Deprecated
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
@@ -71,6 +88,28 @@ public class CassandraStorage extends AbstractCassandraStorage
private boolean widerows = false;
private int limit;
+
+ protected String DEFAULT_INPUT_FORMAT;
+ protected String DEFAULT_OUTPUT_FORMAT;
+
+ protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
+
+ protected String username;
+ protected String password;
+ protected String keyspace;
+ protected String column_family;
+ protected String loadSignature;
+ protected String storeSignature;
+
+ protected Configuration conf;
+ protected String inputFormatClass;
+ protected String outputFormatClass;
+ protected int splitSize = 64 * 1024;
+ protected String partitionerClass;
+ protected boolean usePartitionFilter = false;
+ protected String initHostAddress;
+ protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
// wide row hacks
private ByteBuffer lastKey;
@@ -104,8 +143,7 @@ public class CassandraStorage extends AbstractCassandraStorage
/** read wide row*/
public Tuple getNextWide() throws IOException
{
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = null;
Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
@@ -128,7 +166,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
@@ -166,7 +204,7 @@ public class CassandraStorage extends AbstractCassandraStorage
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
@@ -183,14 +221,14 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
}
}
@@ -200,7 +238,6 @@ public class CassandraStorage extends AbstractCassandraStorage
}
}
- @Override
/** read next row */
public Tuple getNext() throws IOException
{
@@ -212,8 +249,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
assert key != null && cf != null;
@@ -240,7 +276,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
if (hasColumn)
{
- tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
+ tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
}
else if (!cql3Table)
{ // otherwise, we need to add an empty tuple to take its place
@@ -252,7 +288,7 @@ public class CassandraStorage extends AbstractCassandraStorage
for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
@@ -260,7 +296,7 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (ColumnDef cdef : getIndexes())
{
- Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+ Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
tuple.append(throwaway.get(1));
}
}
@@ -272,14 +308,57 @@ public class CassandraStorage extends AbstractCassandraStorage
}
}
+ /** write next row */
+ public void putNext(Tuple t) throws IOException
+ {
+ /*
+ We support two cases for output:
+ First, the original output:
+ (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
+ For supers, we only accept the original output.
+ */
+
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+ ByteBuffer key = objToBB(t.get(0));
+ if (t.getType(1) == DataType.TUPLE)
+ writeColumnsFromTuple(key, t, 1);
+ else if (t.getType(1) == DataType.BAG)
+ {
+ if (t.size() > 2)
+ throw new IOException("No arguments allowed after bag");
+ writeColumnsFromBag(key, (DataBag) t.get(1));
+ }
+ else
+ throw new IOException("Second argument in output must be a tuple or bag");
+ }
+
/** set hadoop cassandra connection settings */
protected void setConnectionInformation() throws IOException
{
- super.setConnectionInformation();
+ StorageHelper.setConnectionInformation(conf);
+ if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
if (System.getenv(PIG_ALLOW_DELETES) != null)
allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
}
+ /** get the full class name */
+ protected String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -296,11 +375,11 @@ public class CassandraStorage extends AbstractCassandraStorage
widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT));
if (System.getenv(PIG_USE_SECONDARY) != null)
usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
- if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
{
try
{
- ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
}
catch (NumberFormatException e)
{
@@ -380,12 +459,67 @@ public class CassandraStorage extends AbstractCassandraStorage
initSchema(storeSignature);
}
+ /** Methods to get the column family schema from Cassandra */
+ protected void initSchema(String signature) throws IOException
+ {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!properties.containsKey(signature))
+ {
+ try
+ {
+ Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
+ client.set_keyspace(keyspace);
+
+ if (username != null && password != null)
+ {
+ Map<String, String> credentials = new HashMap<String, String>(2);
+ credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
+ credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
+
+ try
+ {
+ client.login(new AuthenticationRequest(credentials));
+ }
+ catch (AuthenticationException e)
+ {
+ logger.error("Authentication exception: invalid username and/or password");
+ throw new IOException(e);
+ }
+ }
+
+ // compose the CfDef for the columfamily
+ CfDef cfDef = getCfDef(client);
+
+ if (cfDef != null)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(cfdefToString(cfDef));
+ properties.setProperty(signature, sb.toString());
+ }
+ else
+ throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
+ column_family,
+ keyspace));
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
/** define the schema */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
if (cfDef.column_type.equals("Super"))
return null;
/*
@@ -405,7 +539,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add key
ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
keyFieldSchema.setName("key");
- keyFieldSchema.setType(getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
+ keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
ResourceSchema bagSchema = new ResourceSchema();
ResourceFieldSchema bagField = new ResourceFieldSchema();
@@ -419,8 +553,8 @@ public class CassandraStorage extends AbstractCassandraStorage
ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
bagcolSchema.setName("name");
bagvalSchema.setName("value");
- bagcolSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
- bagvalSchema.setType(getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
+ bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+ bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
bagTupleField.setSchema(bagTupleSchema);
bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
@@ -431,7 +565,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
- if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+ if (!widerows)
{
// defined validators/indexes
for (ColumnDef cdef : cfDef.column_metadata)
@@ -445,14 +579,14 @@ public class CassandraStorage extends AbstractCassandraStorage
ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
idxColSchema.setName("name");
- idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+ idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
ResourceFieldSchema valSchema = new ResourceFieldSchema();
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
valSchema.setName("value");
- valSchema.setType(getPigType(validator));
+ valSchema.setType(StorageHelper.getPigType(validator));
innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
allSchemaFields.add(innerTupleField);
@@ -472,7 +606,7 @@ public class CassandraStorage extends AbstractCassandraStorage
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
- idxSchema.setType(getPigType(validator));
+ idxSchema.setType(StorageHelper.getPigType(validator));
allSchemaFields.add(idxSchema);
}
}
@@ -485,8 +619,8 @@ public class CassandraStorage extends AbstractCassandraStorage
public void setPartitionFilter(Expression partitionFilter) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
}
/** prepare writer */
@@ -495,33 +629,93 @@ public class CassandraStorage extends AbstractCassandraStorage
this.writer = writer;
}
- /** write next row */
- public void putNext(Tuple t) throws IOException
+ /** convert object to ByteBuffer */
+ protected ByteBuffer objToBB(Object o)
{
- /*
- We support two cases for output:
- First, the original output:
- (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
- For supers, we only accept the original output.
- */
+ if (o == null)
+ return nullToBB();
+ if (o instanceof java.lang.String)
+ return ByteBuffer.wrap(new DataByteArray((String)o).get());
+ if (o instanceof Integer)
+ return Int32Type.instance.decompose((Integer)o);
+ if (o instanceof Long)
+ return LongType.instance.decompose((Long)o);
+ if (o instanceof Float)
+ return FloatType.instance.decompose((Float)o);
+ if (o instanceof Double)
+ return DoubleType.instance.decompose((Double)o);
+ if (o instanceof UUID)
+ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+ if(o instanceof Tuple) {
+ List<Object> objects = ((Tuple)o).getAll();
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
+ {
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
- if (t.size() < 1)
+ }
+ return objToCompositeBB(objects);
+ }
+
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
{
- // simply nothing here, we can't even delete without a key
- logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
- return;
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
}
- ByteBuffer key = objToBB(t.get(0));
- if (t.getType(1) == DataType.TUPLE)
- writeColumnsFromTuple(key, t, 1);
- else if (t.getType(1) == DataType.BAG)
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
{
- if (t.size() > 2)
- throw new IOException("No arguments allowed after bag");
- writeColumnsFromBag(key, (DataBag) t.get(1));
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
}
- else
- throw new IOException("Second argument in output must be a tuple or bag");
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
}
/** write tuple data to cassandra */
@@ -643,6 +837,19 @@ public class CassandraStorage extends AbstractCassandraStorage
}
}
+ /** get a list of columns with defined index*/
+ protected List<ColumnDef> getIndexes() throws IOException
+ {
+ CfDef cfdef = getCfDef(loadSignature);
+ List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+ for (ColumnDef cdef : cfdef.column_metadata)
+ {
+ if (cdef.index_type != null)
+ indexes.add(cdef);
+ }
+ return indexes;
+ }
+
/** get a list of Cassandra IndexExpression from Pig expression */
private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException
{
@@ -713,13 +920,64 @@ public class CassandraStorage extends AbstractCassandraStorage
return indexClause.getExpressions();
}
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+
+ /** StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ this.storeSignature = signature;
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ /** output format */
+ public OutputFormat getOutputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public InputFormat getInputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
/** get a list of index expression */
private List<IndexExpression> getIndexExpressions() throws IOException
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
- return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null)
+ return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE));
else
return null;
}
@@ -731,6 +989,129 @@ public class CassandraStorage extends AbstractCassandraStorage
return getColumnMeta(client, true, true);
}
+
+ /** get column meta data */
+ protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
+ throws org.apache.cassandra.thrift.InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ CharacterCodingException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException,
+ NotFoundException
+ {
+ String query = String.format("SELECT column_name, validator, index_type, type " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNS,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+ List<CqlRow> rows = result.rows;
+ List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+ if (rows == null || rows.isEmpty())
+ {
+ // if CassandraStorage, just return the empty list
+ if (cassandraStorage)
+ return columnDefs;
+
+ // otherwise for CqlNativeStorage, check metadata for classic thrift tables
+ CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+ for (ColumnDefinition def : cfm.regularAndStaticColumns())
+ {
+ ColumnDef cDef = new ColumnDef();
+ String columnName = def.name.toString();
+ String type = def.type.toString();
+ logger.debug("name: {}, type: {} ", columnName, type);
+ cDef.name = ByteBufferUtil.bytes(columnName);
+ cDef.validation_class = type;
+ columnDefs.add(cDef);
+ }
+ // we may not need to include the value column for compact tables as we
+ // could have already processed it as schema_columnfamilies.value_alias
+ if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
+ {
+ ColumnDefinition def = cfm.compactValueColumn();
+ if ("value".equals(def.name.toString()))
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = def.name.bytes;
+ cDef.validation_class = def.type.toString();
+ columnDefs.add(cDef);
+ }
+ }
+ return columnDefs;
+ }
+
+ Iterator<CqlRow> iterator = rows.iterator();
+ while (iterator.hasNext())
+ {
+ CqlRow row = iterator.next();
+ ColumnDef cDef = new ColumnDef();
+ String type = ByteBufferUtil.string(row.getColumns().get(3).value);
+ if (!type.equals("regular"))
+ continue;
+ cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
+ cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
+ ByteBuffer indexType = row.getColumns().get(2).value;
+ if (indexType != null)
+ cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
+ columnDefs.add(cDef);
+ }
+ return columnDefs;
+ }
+
+
+ /** get CFMetaData of a column family */
+ protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
+ throws NotFoundException,
+ org.apache.cassandra.thrift.InvalidRequestException,
+ TException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException
+ {
+ KsDef ksDef = client.describe_keyspace(ks);
+ for (CfDef cfDef : ksDef.cf_defs)
+ {
+ if (cfDef.name.equalsIgnoreCase(cf))
+ return ThriftConversion.fromThrift(cfDef);
+ }
+ return null;
+ }
+
+ /** get index type from string */
+ protected IndexType getIndexType(String type)
+ {
+ type = type.toLowerCase();
+ if ("keys".equals(type))
+ return IndexType.KEYS;
+ else if("custom".equals(type))
+ return IndexType.CUSTOM;
+ else if("composites".equals(type))
+ return IndexType.COMPOSITES;
+ else
+ return null;
+ }
+
+ /** return partition keys */
+ public String[] getPartitionKeys(String location, Job job) throws IOException
+ {
+ if (!usePartitionFilter)
+ return null;
+ List<ColumnDef> indexes = getIndexes();
+ String[] partitionKeys = new String[indexes.size()];
+ for (int i = 0; i < indexes.size(); i++)
+ {
+ partitionKeys[i] = new String(indexes.get(i).getName());
+ }
+ return partitionKeys;
+ }
+
/** convert key to a tuple */
private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
{
@@ -744,15 +1125,26 @@ public class CassandraStorage extends AbstractCassandraStorage
{
if( comparator instanceof AbstractCompositeType )
{
- setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
+ StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key));
}
else
{
- setTupleValue(tuple, 0, cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key));
+ StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion));
}
}
+ /** Deconstructs a composite type to a Tuple. */
+ protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
+ {
+ List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name);
+ Tuple t = TupleFactory.getInstance().newTuple(result.size());
+ for (int i=0; i<result.size(); i++)
+ StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion));
+
+ return t;
+ }
+
/** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
* [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
* [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
@@ -817,10 +1209,206 @@ public class CassandraStorage extends AbstractCassandraStorage
"[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
}
}
-
+
+
+ /** decompose the query to store the parameters in a map */
+ public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>(params.length);
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+ }
+ return map;
+ }
+
public ByteBuffer nullToBB()
{
return null;
}
-}
+ /** return the CfInfo for the column family */
+ protected CfDef getCfDef(Cassandra.Client client)
+ throws org.apache.cassandra.thrift.InvalidRequestException,
+ UnavailableException,
+ TimedOutException,
+ SchemaDisagreementException,
+ TException,
+ NotFoundException,
+ org.apache.cassandra.exceptions.InvalidRequestException,
+ ConfigurationException,
+ IOException
+ {
+ // get CF meta data
+ String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ CfDef cfDef = new CfDef();
+ cfDef.keyspace = keyspace;
+ cfDef.name = column_family;
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+
+ cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+ cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
+ ByteBuffer subComparator = cqlRow.columns.get(2).value;
+ if (subComparator != null)
+ cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+ cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
+ cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ }
+ cfDef.column_metadata = getColumnMetadata(client);
+ return cfDef;
+ }
+
+ /** get the columnfamily definition for the signature */
+ protected CfDef getCfDef(String signature) throws IOException
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+ String prop = property.getProperty(signature);
+ return cfdefFromString(prop);
+ }
+
+ /** convert string back to CfDef */
+ protected static CfDef cfdefFromString(String st) throws IOException
+ {
+ assert st != null;
+ TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+ CfDef cfDef = new CfDef();
+ try
+ {
+ deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+ }
+ catch (TException e)
+ {
+ throw new IOException(e);
+ }
+ return cfDef;
+ }
+
+ /** convert CfDef to string */
+ protected static String cfdefToString(CfDef cfDef) throws IOException
+ {
+ assert cfDef != null;
+ // this is so awful it's kind of cool!
+ TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+ try
+ {
+ return Hex.bytesToHex(serializer.serialize(cfDef));
+ }
+ catch (TException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /** parse the string to a cassandra data type */
+ protected AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ // always treat counters like longs, specifically CCT.compose is not what we need
+ if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+ return LongType.instance;
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /** convert a column to a tuple */
+ protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple pair = TupleFactory.getInstance().newTuple(2);
+
+ ByteBuffer colName = col.name().toByteBuffer();
+
+ // name
+ if(comparator instanceof AbstractCompositeType)
+ StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName));
+ else
+ StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+
+ // value
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(colName) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion));
+ }
+ else
+ StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion));
+ return pair;
+ }
+
+ /** construct a map to store the mashaller type to cassandra data type mapping */
+ protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+ {
+ Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
+
+ comparator = parseType(cfDef.getComparator_type());
+ subcomparator = parseType(cfDef.getSubcomparator_type());
+ default_validator = parseType(cfDef.getDefault_validation_class());
+ key_validator = parseType(cfDef.getKey_validation_class());
+
+ marshallers.put(MarshallerType.COMPARATOR, comparator);
+ marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
+ marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
+ marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
+ return marshallers;
+ }
+
+ /** get the validators */
+ protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+ {
+ Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+ for (ColumnDef cd : cfDef.getColumn_metadata())
+ {
+ if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+ {
+ AbstractType validator = null;
+ try
+ {
+ validator = TypeParser.parse(cd.getValidation_class());
+ if (validator instanceof CounterColumnType)
+ validator = LongType.instance;
+ validators.put(cd.name, validator);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ catch (SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+ return validators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..91cdd02 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -17,48 +17,78 @@
*/
package org.apache.cassandra.hadoop.pig;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.*;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.apache.cassandra.db.BufferCell;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.utils.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.*;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
-import com.datastax.driver.core.Row;
-public class CqlNativeStorage extends AbstractCassandraStorage
+public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
+ protected String DEFAULT_INPUT_FORMAT;
+ protected String DEFAULT_OUTPUT_FORMAT;
+
+ protected String username;
+ protected String password;
+ protected String keyspace;
+ protected String column_family;
+ protected String loadSignature;
+ protected String storeSignature;
+
+ protected Configuration conf;
+ protected String inputFormatClass;
+ protected String outputFormatClass;
+ protected int splitSize = 64 * 1024;
+ protected String partitionerClass;
+ protected boolean usePartitionFilter = false;
+ protected String initHostAddress;
+ protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
+
private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
private int pageSize = 1000;
private String columns;
private String outputQuery;
private String whereClause;
- private boolean hasCompactValueAlias = false;
private RecordReader<Long, Row> reader;
private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
@@ -119,21 +149,20 @@ public class CqlNativeStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ TableInfo tableMetadata = getCfInfo(loadSignature);
Row row = reader.getCurrentValue();
- Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
- Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+ Tuple tuple = TupleFactory.getInstance().newTuple(tableMetadata.getColumns().size());
+ Iterator<ColumnInfo> itera = tableMetadata.getColumns().iterator();
int i = 0;
while (itera.hasNext())
{
- ColumnDef cdef = itera.next();
- ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+ ColumnInfo cdef = itera.next();
+ ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
if (columnValue != null)
{
- Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
- AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
- setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
+ Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue);
+ AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
+ setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator);
}
else
tuple.set(i, null);
@@ -148,15 +177,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage
}
/** convert a cql column to an object */
- private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+ private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
{
// standard
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
ByteBuffer cellName = col.name().toByteBuffer();
- if (validators.get(cellName) == null)
- return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
- else
- return cassandraToObj(validators.get(cellName), col.value());
+ return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion);
}
/** set the value to the position of the tuple */
@@ -165,7 +191,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage
if (validator instanceof CollectionType)
setCollectionTupleValues(tuple, position, value, validator);
else
- setTupleValue(tuple, position, value);
+ StorageHelper.setTupleValue(tuple, position, value);
}
/** set the values of set/list at and after the position of the tuple */
@@ -220,173 +246,33 @@ public class CqlNativeStorage extends AbstractCassandraStorage
return obj;
}
- /** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
- {
- List<ColumnDef> keyColumns = null;
- // get key columns
+ /** get the columnfamily definition for the signature */
+ protected TableInfo getCfInfo(String signature) throws IOException
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ TableInfo cfInfo;
try
{
- keyColumns = getKeysMeta(client);
+ cfInfo = cfdefFromString(property.getProperty(signature));
}
- catch(Exception e)
+ catch (ClassNotFoundException e)
{
- logger.error("Error in retrieving key columns" , e);
+ throw new IOException(e);
}
-
- // get other columns
- List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
-
- // combine all columns in a list
- if (keyColumns != null && columns != null)
- keyColumns.addAll(columns);
-
- return keyColumns;
+ return cfInfo;
}
- /** get keys meta data */
- private List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
+ /** return the CfInfo for the column family */
+ protected TableMetadata getCfInfo(Session client)
+ throws NoHostAvailableException,
+ AuthenticationException,
+ IllegalStateException
{
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
- for (ColumnDefinition def : cfm.partitionKeyColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnDefinition def : cfm.clusteringColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- hasCompactValueAlias = true;
- }
- catch (Exception e)
- {
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
+ // get CF meta data
+ return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
}
-
/** output: (((name, value), (name, value)), (value ... value), (value...value)) */
public void putNext(Tuple t) throws IOException
{
@@ -441,6 +327,91 @@ public class CqlNativeStorage extends AbstractCassandraStorage
return keys;
}
+ /** convert object to ByteBuffer */
+ protected ByteBuffer objToBB(Object o)
+ {
+ if (o == null)
+ return nullToBB();
+ if (o instanceof java.lang.String)
+ return ByteBuffer.wrap(new DataByteArray((String)o).get());
+ if (o instanceof Integer)
+ return Int32Type.instance.decompose((Integer)o);
+ if (o instanceof Long)
+ return LongType.instance.decompose((Long)o);
+ if (o instanceof Float)
+ return FloatType.instance.decompose((Float)o);
+ if (o instanceof Double)
+ return DoubleType.instance.decompose((Double)o);
+ if (o instanceof UUID)
+ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+ if(o instanceof Tuple) {
+ List<Object> objects = ((Tuple)o).getAll();
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
+ {
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
+
+ }
+ return objToCompositeBB(objects);
+ }
+
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ }
+ return CollectionSerializer.pack(serialized, objects.size(), 3);
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
+ {
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
+ }
+ return CollectionSerializer.pack(serialized, objects.size(), 3);
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
/** send CQL query request using data from tuple */
private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
{
@@ -487,30 +458,50 @@ public class CqlNativeStorage extends AbstractCassandraStorage
}
}
+ /** get the validators */
+ protected Map<ByteBuffer, AbstractType> getValidatorMap(TableInfo cfDef) throws IOException
+ {
+ Map<ByteBuffer, AbstractType> validators = new HashMap<>();
+ for (ColumnInfo cd : cfDef.getColumns())
+ {
+ if (cd.getTypeName() != null)
+ {
+ try
+ {
+ AbstractType validator = TypeParser.parseCqlName(cd.getTypeName());
+ if (validator instanceof CounterColumnType)
+ validator = LongType.instance;
+ validators.put(ByteBufferUtil.bytes(cd.getName()), validator);
+ }
+ catch (ConfigurationException | SyntaxException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+ return validators;
+ }
+
/** schema: (value, value, value) where keys are in the front. */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ TableInfo cfInfo = getCfInfo(loadSignature);
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
- // get default marshallers and validators
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+ // get default validators
+ Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfInfo);
// will contain all fields for this schema
List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
- for (ColumnDef cdef : cfDef.column_metadata)
+ for (ColumnInfo cdef : cfInfo.getColumns())
{
ResourceFieldSchema valSchema = new ResourceFieldSchema();
- AbstractType validator = validators.get(cdef.name);
- if (validator == null)
- validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+ AbstractType validator = validators.get(cdef.getName());
valSchema.setName(new String(cdef.getName()));
- valSchema.setType(getPigType(validator));
+ valSchema.setType(StorageHelper.getPigType(validator));
allSchemaFields.add(valSchema);
}
@@ -522,8 +513,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage
public void setPartitionFilter(Expression partitionFilter) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
}
/**
@@ -557,8 +548,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage
private String getWhereClauseForPartitionFilter()
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- return property.getProperty(PARTITION_FILTER_SIGNATURE);
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
}
/** set read configuration settings */
@@ -631,7 +622,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage
CqlConfigHelper.setInputWhereClauses(conf, whereClause);
String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
- String wc = whereClause != null && !whereClause.trim().isEmpty()
+ String wc = whereClause != null && !whereClause.trim().isEmpty()
? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
: whereClauseForPartitionFilter;
@@ -639,17 +630,17 @@ public class CqlNativeStorage extends AbstractCassandraStorage
{
logger.debug("where clause: {}", wc);
CqlConfigHelper.setInputWhereClauses(conf, wc);
- }
- if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ }
+ if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
{
try
{
- ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
}
catch (NumberFormatException e)
{
throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
- }
+ }
}
if (ConfigHelper.getInputInitialAddress(conf) == null)
@@ -700,6 +691,74 @@ public class CqlNativeStorage extends AbstractCassandraStorage
initSchema(storeSignature);
}
+ /** Methods to get the column family schema from Cassandra */
+ protected void initSchema(String signature) throws IOException
+ {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(CqlNativeStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!properties.containsKey(signature))
+ {
+ try
+ {
+ Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect();
+ client.execute("USE " + keyspace);
+
+ // compose the CfDef for the columfamily
+ TableMetadata cfInfo = getCfInfo(client);
+
+ if (cfInfo != null)
+ {
+ properties.setProperty(signature, cfdefToString(cfInfo));
+ }
+ else
+ throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
+ column_family,
+ keyspace));
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+
+ /** convert CfDef to string */
+ protected static String cfdefToString(TableMetadata cfDef) throws IOException
+ {
+ TableInfo tableInfo = new TableInfo(cfDef);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream( baos );
+ oos.writeObject( tableInfo );
+ oos.close();
+ return new String( Base64Coder.encode(baos.toByteArray()) );
+ }
+
+ /** convert string back to CfDef */
+ protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException
+ {
+ byte [] data = Base64Coder.decode( st );
+ ObjectInputStream ois = new ObjectInputStream(
+ new ByteArrayInputStream( data ) );
+ Object o = ois.readObject();
+ ois.close();
+ return (TableInfo)o;
+ }
+
+ /** decompose the query to store the parameters in a map */
+ public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>(params.length);
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+ }
+ return map;
+ }
+
private void setLocationFromUri(String location) throws IOException
{
try
@@ -808,11 +867,171 @@ public class CqlNativeStorage extends AbstractCassandraStorage
}
}
- /**
- * Thrift API can't handle null, so use empty byte array
- */
public ByteBuffer nullToBB()
{
return ByteBuffer.wrap(new byte[0]);
}
+
+ /** output format */
+ public OutputFormat getOutputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+ /** return partition keys */
+ public String[] getPartitionKeys(String location, Job job) throws IOException
+ {
+ if (!usePartitionFilter)
+ return null;
+ TableInfo tableMetadata = getCfInfo(loadSignature);
+ String[] partitionKeys = new String[tableMetadata.getPartitionKey().size()];
+ for (int i = 0; i < tableMetadata.getPartitionKey().size(); i++)
+ {
+ partitionKeys[i] = new String(tableMetadata.getPartitionKey().get(i).getName());
+ }
+ return partitionKeys;
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
+ public ResourceStatistics getStatistics(String location, Job job)
+ {
+ return null;
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException
+ {
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+ {
+ return location;
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature)
+ {
+ this.loadSignature = signature;
+ }
+
+ /** StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ this.storeSignature = signature;
+ }
+
+ /** set hadoop cassandra connection settings */
+ protected void setConnectionInformation() throws IOException
+ {
+ StorageHelper.setConnectionInformation(conf);
+ if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+ }
+
+ /** get the full class name */
+ protected String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+ }
+}
+
+class TableInfo implements Serializable
+{
+ private final List<ColumnInfo> columns;
+ private final List<ColumnInfo> partitionKey;
+ private final String name;
+
+ public TableInfo(TableMetadata tableMetadata)
+ {
+ List<ColumnMetadata> cmColumns = tableMetadata.getColumns();
+ columns = new ArrayList<>(cmColumns.size());
+ for (ColumnMetadata cm : cmColumns)
+ {
+ columns.add(new ColumnInfo(this, cm));
+ }
+ List<ColumnMetadata> cmPartitionKey = tableMetadata.getPartitionKey();
+ partitionKey = new ArrayList<>(cmPartitionKey.size());
+ for (ColumnMetadata cm : cmPartitionKey)
+ {
+ partitionKey.add(new ColumnInfo(this, cm));
+ }
+ name = tableMetadata.getName();
+ }
+
+ public List<ColumnInfo> getPartitionKey()
+ {
+ return partitionKey;
+ }
+
+ public List<ColumnInfo> getColumns()
+ {
+ return columns;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
}
+
+class ColumnInfo implements Serializable
+{
+ private final TableInfo table;
+ private final String name;
+ private final String typeName;
+
+ public ColumnInfo(TableInfo tableInfo, ColumnMetadata columnMetadata)
+ {
+ table = tableInfo;
+ name = columnMetadata.getName();
+ typeName = columnMetadata.getType().toString();
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getTypeName()
+ {
+ return typeName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
new file mode 100644
index 0000000..66836b2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
@@ -0,0 +1,121 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class StorageHelper
+{
+ // system environment variables that can be set to configure connection info:
+ // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+ public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+ public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+ public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+ public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+ public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+ public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
+ public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+ public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+ public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+ public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+ public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+ public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
+
+
+ public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
+
+ protected static void setConnectionInformation(Configuration conf)
+ {
+ if (System.getenv(PIG_RPC_PORT) != null)
+ {
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ }
+
+ if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+ if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
+ if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+ {
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ }
+ if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+ if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+ if (System.getenv(PIG_PARTITIONER) != null)
+ {
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ }
+ if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+ if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+ }
+
+ protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion)
+ {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ return validator.getString(value);
+
+ if (validator instanceof CollectionType)
+ {
+ // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
+ // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
+ return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
+ }
+
+ return validator.compose(value);
+ }
+
+ /** set the value to the position of the tuple */
+ protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+ {
+ if (value instanceof BigInteger)
+ pair.set(position, ((BigInteger) value).intValue());
+ else if (value instanceof ByteBuffer)
+ pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+ else if (value instanceof UUID)
+ pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+ else if (value instanceof Date)
+ pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
+ else
+ pair.set(position, value);
+ }
+
+ /** get pig type for the cassandra data type*/
+ protected static byte getPigType(AbstractType type)
+ {
+ if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
+ return DataType.LONG;
+ else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
+ return DataType.INTEGER;
+ else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
+ return DataType.CHARARRAY;
+ else if (type instanceof FloatType)
+ return DataType.FLOAT;
+ else if (type instanceof DoubleType)
+ return DataType.DOUBLE;
+ else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
+ return DataType.TUPLE;
+
+ return DataType.BYTEARRAY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 06d83dd..6991958 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -92,7 +92,7 @@ public class SSTableLoader implements StreamEventHandler
return false;
}
- CFMetaData metadata = client.getCFMetaData(keyspace, desc.cfname);
+ CFMetaData metadata = client.getTableMetadata(desc.cfname);
if (metadata == null)
{
outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname));
@@ -251,7 +251,9 @@ public class SSTableLoader implements StreamEventHandler
/**
* Stop the client.
*/
- public void stop() {}
+ public void stop()
+ {
+ }
/**
* Provides connection factory.
@@ -268,7 +270,12 @@ public class SSTableLoader implements StreamEventHandler
* Validate that {@code keyspace} is an existing keyspace and {@code
* cfName} one of its existing column family.
*/
- public abstract CFMetaData getCFMetaData(String keyspace, String cfName);
+ public abstract CFMetaData getTableMetadata(String tableName);
+
+ public void setTableMetadata(CFMetaData cfm)
+ {
+ throw new RuntimeException();
+ }
public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d6ce46e..c17d2d7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4117,8 +4117,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
SSTableLoader.Client client = new SSTableLoader.Client()
{
+ private String keyspace;
+
public void init(String keyspace)
{
+ this.keyspace = keyspace;
try
{
setPartitioner(DatabaseDescriptor.getPartitioner());
@@ -4135,14 +4138,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ public CFMetaData getTableMetadata(String tableName)
{
- return Schema.instance.getCFMetaData(keyspace, cfName);
+ return Schema.instance.getCFMetaData(keyspace, tableName);
}
};
- SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput());
- return loader.stream();
+ return new SSTableLoader(dir, client, new OutputHandler.LogOutput()).stream();
}
public void rescheduleFailedDeletions()