You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/05 22:57:01 UTC
[4/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
Remove Thrift dependencies in bundled tools
patch by Philip Thompson; reviewed by Aleksey Yeschenko for
CASSANDRA-8358
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f698cc22
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f698cc22
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f698cc22
Branch: refs/heads/trunk
Commit: f698cc228452e847e3ad46bd8178549cf8171767
Parents: 5b61545
Author: Philip Thompson <pt...@gmail.com>
Authored: Tue May 5 21:38:23 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue May 5 23:57:39 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +
build.xml | 8 +-
lib/cassandra-driver-core-2.1.2.jar | Bin 638544 -> 0 bytes
lib/cassandra-driver-core-2.1.5-shaded.jar | Bin 0 -> 1994984 bytes
.../apache/cassandra/db/marshal/TypeParser.java | 71 +-
.../hadoop/AbstractBulkOutputFormat.java | 73 --
.../hadoop/AbstractBulkRecordWriter.java | 239 -------
.../hadoop/AbstractColumnFamilyInputFormat.java | 245 +++----
.../AbstractColumnFamilyOutputFormat.java | 164 -----
.../AbstractColumnFamilyRecordWriter.java | 193 -----
.../cassandra/hadoop/BulkOutputFormat.java | 51 +-
.../cassandra/hadoop/BulkRecordWriter.java | 145 +++-
.../hadoop/ColumnFamilyInputFormat.java | 47 +-
.../hadoop/ColumnFamilyOutputFormat.java | 119 +++-
.../hadoop/ColumnFamilyRecordReader.java | 1 +
.../hadoop/ColumnFamilyRecordWriter.java | 124 +++-
.../hadoop/cql3/CqlBulkOutputFormat.java | 81 ++-
.../hadoop/cql3/CqlBulkRecordWriter.java | 223 ++++--
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 52 +-
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 76 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 93 ++-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 392 ++++++----
.../cassandra/hadoop/pig/CassandraStorage.java | 706 +++++++++++++++++--
.../cassandra/hadoop/pig/CqlNativeStorage.java | 629 +++++++++++------
.../cassandra/hadoop/pig/StorageHelper.java | 121 ++++
.../cassandra/io/sstable/SSTableLoader.java | 13 +-
.../cassandra/service/StorageService.java | 10 +-
.../org/apache/cassandra/tools/BulkLoader.java | 209 ++----
.../utils/NativeSSTableLoaderClient.java | 126 ++++
.../org/apache/cassandra/pig/CqlTableTest.java | 81 +--
.../io/sstable/CQLSSTableWriterTest.java | 14 +-
.../cassandra/io/sstable/SSTableLoaderTest.java | 7 +-
33 files changed, 2678 insertions(+), 1639 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f89ece..ab92aa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)
* Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242)
* Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049)
* Distinguish between null and unset in protocol v4 (CASSANDRA-7304)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 03008de..32351a1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -75,6 +75,9 @@ New features
Upgrading
---------
- Pig's CqlStorage has been removed, use CqlNativeStorage instead
+ - Pig's CassandraStorage has been deprecated. CassandraStorage
+ should only be used against tables created via thrift.
+ Use CqlNativeStorage for all other tables.
- IAuthenticator been updated to remove responsibility for user/role
maintenance and is now solely responsible for validating credentials,
This is primarily done via SASL, though an optional method exists for
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index ba99cd9..a5f195f 100644
--- a/build.xml
+++ b/build.xml
@@ -381,7 +381,7 @@
<dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" />
<dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" />
+ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" />
<dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" />
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.4" />
<dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
@@ -433,7 +433,7 @@
<dependency groupId="org.apache.pig" artifactId="pig"/>
<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
<dependency groupId="org.antlr" artifactId="antlr"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
+ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
<dependency groupId="org.javassist" artifactId="javassist"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
<dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
@@ -501,12 +501,12 @@
<dependency groupId="org.apache.thrift" artifactId="libthrift"/>
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/>
-
+
<!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
<dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
+ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true"/>
<!-- don't need jna to run, but nice to have -->
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.2.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.2.jar b/lib/cassandra-driver-core-2.1.2.jar
deleted file mode 100644
index 2095c05..0000000
Binary files a/lib/cassandra-driver-core-2.1.2.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.5-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.5-shaded.jar b/lib/cassandra-driver-core-2.1.5-shaded.jar
new file mode 100644
index 0000000..bb83fb5
Binary files /dev/null and b/lib/cassandra-driver-core-2.1.5-shaded.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index ad7ffed..faa678e 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -21,13 +21,9 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +38,7 @@ public class TypeParser
private int idx;
// A cache of parsed string, specially useful for DynamicCompositeType
- private static final Map<String, AbstractType<?>> cache = new HashMap<String, AbstractType<?>>();
+ private static final Map<String, AbstractType<?>> cache = new HashMap<>();
public static final TypeParser EMPTY_PARSER = new TypeParser("", 0);
@@ -98,9 +94,48 @@ public class TypeParser
return parse(compareWith == null ? null : compareWith.toString());
}
- public static String getShortName(AbstractType<?> type)
+ public static String parseCqlNativeType(String str)
{
- return type.getClass().getSimpleName();
+ return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString();
+ }
+
+ public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException
+ {
+ str = str.trim().toLowerCase();
+ switch (str)
+ {
+ case "map": return "MapType";
+ case "set": return "SetType";
+ case "list": return "ListType";
+ case "frozen": return "FrozenType";
+ default: throw new SyntaxException("Invalid type name" + str);
+ }
+ }
+
+ /**
+ * Turns user facing type names into Abstract Types, 'text' -> UTF8Type
+ */
+ public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException
+ {
+ return parse(parseCqlNameRecurse(str));
+ }
+
+ private static String parseCqlNameRecurse(String str) throws SyntaxException
+ {
+ if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<'))))
+ {
+ String[] parseString = str.split(",", 2);
+ return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]);
+ }
+ else if (str.contains("<"))
+ {
+ String[] parseString = str.trim().split("<", 2);
+ return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")";
+ }
+ else
+ {
+ return parseCqlNativeType(str);
+ }
}
/**
@@ -126,7 +161,7 @@ public class TypeParser
if (str.charAt(idx) != '(')
throw new IllegalStateException();
- Map<String, String> map = new HashMap<String, String>();
+ Map<String, String> map = new HashMap<>();
++idx; // skipping '('
while (skipBlankAndComma())
@@ -157,7 +192,7 @@ public class TypeParser
public List<AbstractType<?>> getTypeParameters() throws SyntaxException, ConfigurationException
{
- List<AbstractType<?>> list = new ArrayList<AbstractType<?>>();
+ List<AbstractType<?>> list = new ArrayList<>();
if (isEOS())
return list;
@@ -191,7 +226,7 @@ public class TypeParser
public Map<Byte, AbstractType<?>> getAliasParameters() throws SyntaxException, ConfigurationException
{
- Map<Byte, AbstractType<?>> map = new HashMap<Byte, AbstractType<?>>();
+ Map<Byte, AbstractType<?>> map = new HashMap<>();
if (isEOS())
return map;
@@ -384,11 +419,7 @@ public class TypeParser
Field field = typeClass.getDeclaredField("instance");
return (AbstractType<?>) field.get(null);
}
- catch (NoSuchFieldException e)
- {
- throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
- }
- catch (IllegalAccessException e)
+ catch (NoSuchFieldException | IllegalAccessException e)
{
throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
}
@@ -489,12 +520,6 @@ public class TypeParser
return str.substring(i, idx);
}
- public char readNextChar()
- {
- skipBlank();
- return str.charAt(idx++);
- }
-
/**
* Helper function to ease the writing of AbstractType.toString() methods.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
deleted file mode 100644
index c0e91da..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-
-public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
- implements org.apache.hadoop.mapred.OutputFormat<K, V>
-{
- @Override
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- private void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
- }
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
- public static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
deleted file mode 100644
index 5ba0a96..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
-
-public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
-implements org.apache.hadoop.mapred.RecordWriter<K, V>
-{
- public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
- public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
- public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
- public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
-
- private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
-
- protected final Configuration conf;
- protected final int maxFailures;
- protected final int bufferSize;
- protected Closeable writer;
- protected SSTableLoader loader;
- protected Progressable progress;
- protected TaskAttemptContext context;
-
- protected AbstractBulkRecordWriter(TaskAttemptContext context)
- {
- this(HadoopCompat.getConfiguration(context));
- this.context = context;
- }
-
- protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
- {
- this(conf);
- this.progress = progress;
- }
-
- protected AbstractBulkRecordWriter(Configuration conf)
- {
- Config.setOutboundBindAny(true);
- this.conf = conf;
- DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
- maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
- bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
- }
-
- protected String getOutputLocation() throws IOException
- {
- String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
- if (dir == null)
- throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
- return dir;
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- private void close() throws IOException
- {
- if (writer != null)
- {
- writer.close();
- Future<StreamState> future = loader.stream();
- while (true)
- {
- try
- {
- future.get(1000, TimeUnit.MILLISECONDS);
- break;
- }
- catch (ExecutionException | TimeoutException te)
- {
- if (null != progress)
- progress.progress();
- if (null != context)
- HadoopCompat.progress(context);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
- if (loader.getFailedHosts().size() > 0)
- {
- if (loader.getFailedHosts().size() > maxFailures)
- throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
- else
- logger.warn("Some hosts failed: {}", loader.getFailedHosts());
- }
- }
- }
-
- public static class ExternalClient extends SSTableLoader.Client
- {
- private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
- private final Configuration conf;
- private final String hostlist;
- private final int rpcPort;
- private final String username;
- private final String password;
-
- public ExternalClient(Configuration conf)
- {
- super();
- this.conf = conf;
- this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
- this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
- this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
- this.password = ConfigHelper.getOutputKeyspacePassword(conf);
- }
-
- public void init(String keyspace)
- {
- String[] nodes = hostlist.split(",");
- Set<InetAddress> hosts = new HashSet<InetAddress>(nodes.length);
- for (String node : nodes)
- {
- try
- {
- hosts.add(InetAddress.getByName(node));
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
- Iterator<InetAddress> hostiter = hosts.iterator();
- while (hostiter.hasNext())
- {
- try
- {
- InetAddress host = hostiter.next();
- Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
-
- // log in
- client.set_keyspace(keyspace);
- if (username != null)
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(PasswordAuthenticator.USERNAME_KEY, username);
- creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- List<TokenRange> tokenRanges = client.describe_ring(keyspace);
- List<KsDef> ksDefs = client.describe_keyspaces();
-
- setPartitioner(client.describe_partitioner());
- Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
- for (TokenRange tr : tokenRanges)
- {
- Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
- for (String ep : tr.endpoints)
- {
- addRangeForEndpoint(range, InetAddress.getByName(ep));
- }
- }
-
- for (KsDef ksDef : ksDefs)
- {
- Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
- for (CfDef cfDef : ksDef.cf_defs)
- cfs.put(cfDef.name, ThriftConversion.fromThrift(cfDef));
- knownCfs.put(ksDef.name, cfs);
- }
- break;
- }
- catch (Exception e)
- {
- if (!hostiter.hasNext())
- throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
- }
- }
- }
-
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
- }
- }
-
- public static class NullOutputHandler implements OutputHandler
- {
- public void output(String msg) {}
- public void debug(String msg) {}
- public void warn(String msg) {}
- public void warn(String msg, Throwable th) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 691bd76..2ef4cf4 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -18,31 +18,27 @@
package org.apache.cassandra.hadoop;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.hadoop.cql3.*;
+import org.apache.cassandra.thrift.KeyRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
{
@@ -51,7 +47,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
public static final String MAPRED_TASK_ID = "mapred.task.id";
// The simple fact that we need this is because the old Hadoop API wants us to "write"
// to the key and value whereas the new asks for it.
- // I choose 8kb as the default max key size (instanciated only once), but you can
+ // I choose 8kb as the default max key size (instantiated only once), but you can
// override it in your jobConf with this setting.
public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
@@ -59,6 +55,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private String keyspace;
private String cfName;
private IPartitioner partitioner;
+ private Session session;
protected void validateConfiguration(Configuration conf)
{
@@ -72,57 +69,27 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
}
- public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
- {
- logger.debug("Creating authenticated client for CF input format");
- TTransport transport;
- try
- {
- transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
- }
- catch (Exception e)
- {
- throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
- }
- TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-
- // log in
- client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
- if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
- creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
- logger.debug("Authenticated client for CF input format created successfully");
- return client;
- }
-
public List<InputSplit> getSplits(JobContext context) throws IOException
{
- Configuration conf = HadoopCompat.getConfiguration(context);;
+ Configuration conf = HadoopCompat.getConfiguration(context);
validateConfiguration(conf);
- // cannonical ranges and nodes holding replicas
- List<TokenRange> masterRangeNodes = getRangeMap(conf);
-
keyspace = ConfigHelper.getInputKeyspace(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
partitioner = ConfigHelper.getInputPartitioner(conf);
logger.debug("partitioner is {}", partitioner);
+ // canonical ranges and nodes holding replicas
+ Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
- // cannonical ranges, split into pieces, fetching the splits in parallel
+ // canonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<InputSplit> splits = new ArrayList<>();
try
{
- List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+ List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
Range<Token> jobRange = null;
if (jobKeyRange != null)
@@ -130,7 +97,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
if (jobKeyRange.start_key != null)
{
if (!partitioner.preservesOrder())
- throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
+ throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
if (jobKeyRange.start_token != null)
throw new IllegalArgumentException("only start_key supported");
if (jobKeyRange.end_token != null)
@@ -149,26 +116,25 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- for (TokenRange range : masterRangeNodes)
+ session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+ Metadata metadata = session.getCluster().getMetadata();
+
+ for (TokenRange range : masterRangeNodes.keySet())
{
if (jobRange == null)
{
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+ // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+ splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
}
else
{
- Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
- partitioner.getTokenFactory().fromString(range.end_token));
-
- if (dhtRange.intersects(jobRange))
+ TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
+ if (range.intersects(jobTokenRange))
{
- for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
+ for (TokenRange intersection: range.intersectWith(jobTokenRange))
{
- range.start_token = partitioner.getTokenFactory().toString(intersection.left);
- range.end_token = partitioner.getTokenFactory().toString(intersection.right);
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+ // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+ splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf)));
}
}
}
@@ -197,53 +163,53 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
return splits;
}
+ private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+ {
+ return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+ metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+ }
+
/**
- * Gets a token range and splits it up according to the suggested
+ * Gets a token tokenRange and splits it up according to the suggested
* size into input splits that Hadoop can use.
*/
class SplitCallable implements Callable<List<InputSplit>>
{
- private final TokenRange range;
+ private final TokenRange tokenRange;
+ private final Set<Host> hosts;
private final Configuration conf;
- public SplitCallable(TokenRange tr, Configuration conf)
+ public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
{
- this.range = tr;
+ this.tokenRange = tr;
+ this.hosts = hosts;
this.conf = conf;
}
public List<InputSplit> call() throws Exception
{
- ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
- List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
- assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
+ ArrayList<InputSplit> splits = new ArrayList<>();
+ Map<TokenRange, Long> subSplits;
+ subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
// turn the sub-ranges into InputSplits
- String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+ String[] endpoints = new String[hosts.size()];
+
// hadoop needs hostname, not ip
int endpointIndex = 0;
- for (String endpoint: range.rpc_endpoints)
- {
- String endpoint_address = endpoint;
- if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
- endpoint_address = range.endpoints.get(endpointIndex);
- endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
- }
+ for (Host endpoint : hosts)
+ endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
- Token.TokenFactory factory = partitioner.getTokenFactory();
- for (CfSplit subSplit : subSplits)
+ for (TokenRange subSplit : subSplits.keySet())
{
- Token left = factory.fromString(subSplit.getStart_token());
- Token right = factory.fromString(subSplit.getEnd_token());
- Range<Token> range = new Range<Token>(left, right);
- List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
- for (Range<Token> subrange : ranges)
+ List<TokenRange> ranges = subSplit.unwrap();
+ for (TokenRange subrange : ranges)
{
ColumnFamilySplit split =
new ColumnFamilySplit(
- factory.toString(subrange.left),
- factory.toString(subrange.right),
- subSplit.getRow_count(),
+ subrange.getStart().toString().substring(2),
+ subrange.getEnd().toString().substring(2),
+ subSplits.get(subSplit),
endpoints);
logger.debug("adding {}", split);
@@ -254,80 +220,63 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+ private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
- int splitsize = ConfigHelper.getInputSplitSize(conf);
- for (int i = 0; i < range.rpc_endpoints.size(); i++)
+ int splitSize = ConfigHelper.getInputSplitSize(conf);
+ try
{
- String host = range.rpc_endpoints.get(i);
-
- if (host == null || host.equals("0.0.0.0"))
- host = range.endpoints.get(i);
-
- try
- {
- Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
- client.set_keyspace(keyspace);
-
- try
- {
- return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
- }
- catch (TApplicationException e)
- {
- // fallback to guessing split size if talking to a server without describe_splits_ex method
- if (e.getType() == TApplicationException.UNKNOWN_METHOD)
- {
- List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
- return tokenListToSplits(splitPoints, splitsize);
- }
- throw e;
- }
- }
- catch (IOException e)
- {
- logger.debug("failed connect to endpoint {}", host, e);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
+ return describeSplits(keyspace, cfName, range, splitSize);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
}
- throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
}
- private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
+ private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
{
- List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
- for (int j = 0; j < splitTokens.size() - 1; j++)
- splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
- return splits;
+ Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+
+ Map<TokenRange, Set<Host>> map = new HashMap<>();
+ Metadata metadata = session.getCluster().getMetadata();
+ for (TokenRange tokenRange : metadata.getTokenRanges())
+ map.put(tokenRange, metadata.getReplicas(keyspace, tokenRange));
+ return map;
}
- private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
{
- Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
- List<TokenRange> map;
- try
+ String query = String.format("SELECT mean_partition_size, partitions_count " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+ SystemKeyspace.NAME,
+ SystemKeyspace.SIZE_ESTIMATES);
+
+ ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+
+ Row row = resultSet.one();
+ // If we have no data on this split, return the full split i.e., do not sub-split
+ // Assume smallest granularity of partition count available from CASSANDRA-7688
+ if (row == null)
{
- map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
+ Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
+ wrappedTokenRange.put(tokenRange, (long) 128);
+ return wrappedTokenRange;
}
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
-
- return map;
+
+ long meanPartitionSize = row.getLong("mean_partition_size");
+ long partitionCount = row.getLong("partitions_count");
+
+ int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+ List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
+ Map<TokenRange, Long> rangesWithLength = new HashMap<>();
+ for (TokenRange range : splitRanges)
+ rangesWithLength.put(range, partitionCount/splitCount);
+
+ return rangesWithLength;
}
- //
// Old Hadoop API
- //
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
{
TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
deleted file mode 100644
index 03d0045..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
- * OutputFormat that allows reduce tasks to store keys (and corresponding
- * values) as Cassandra rows (and respective columns) in a given
- * ColumnFamily.
- *
- * <p>
- * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
- * Keyspace and ColumnFamily in your
- * Hadoop job Configuration. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
- * simple.
- * </p>
- *
- * <p>
- * For the sake of performance, this class employs a lazy write-back caching
- * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
- * </p>
- * @param <Y>
- */
-public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y>
-{
- public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
- public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
- private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
-
-
- /**
- * Check for validity of the output-specification for the job.
- *
- * @param context
- * information about the job
- */
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- protected void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
- if (ConfigHelper.getOutputPartitioner(conf) == null)
- throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
- if (ConfigHelper.getOutputInitialAddress(conf) == null)
- throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
- /**
- * The OutputCommitter for this format does not write any data to the DFS.
- *
- * @param context
- * the task context
- * @return an output committer
- * @throws IOException
- * @throws InterruptedException
- */
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /**
- * Connects to the given server:port and returns a client based on the given socket that points to the configured
- * keyspace, and is logged in with the configured credentials.
- *
- * @param host fully qualified host name to connect to
- * @param port RPC port of the server
- * @param conf a job configuration
- * @return a cassandra client
- * @throws Exception set of thrown exceptions may be implementation defined,
- * depending on the used transport factory
- */
- public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
- {
- logger.debug("Creating authenticated client for CF output format");
- TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
- TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
- client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
- String user = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
- if ((user != null) && (password != null))
- login(user, password, client);
-
- logger.debug("Authenticated client for CF output format created successfully");
- return client;
- }
-
- public static void login(String user, String password, Cassandra.Client client) throws Exception
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(PasswordAuthenticator.USERNAME_KEY, user);
- creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- /**
- * An {@link OutputCommitter} that does nothing.
- */
- private static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
deleted file mode 100644
index cb44beb..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.transport.TTransport;
-import org.apache.hadoop.util.Progressable;
-
-
-/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
- *
- * <p>
- * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
- * </p>
- *
- * @see ColumnFamilyOutputFormat
- */
-public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y>
-{
- // The configuration this writer is associated with.
- protected final Configuration conf;
-
- // The ring cache that describes the token ranges each node in the ring is
- // responsible for. This is what allows us to group the mutations by
- // the endpoints they should be targeted at. The targeted endpoint
- // essentially
- // acts as the primary replica for the rows being affected by the mutations.
- protected final RingCache ringCache;
-
- // The number of mutations to buffer per endpoint
- protected final int queueSize;
-
- protected final long batchThreshold;
-
- protected final ConsistencyLevel consistencyLevel;
- protected Progressable progressable;
- protected TaskAttemptContext context;
-
- protected AbstractColumnFamilyRecordWriter(Configuration conf)
- {
- this.conf = conf;
- this.ringCache = new RingCache(conf);
- this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
- batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
- consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
- }
-
- /**
- * Close this <code>RecordWriter</code> to future operations, but not before
- * flushing out the batched mutations.
- *
- * @param context the context of the task
- * @throws IOException
- */
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- protected abstract void close() throws IOException;
-
- /**
- * A client that runs in a threadpool and connects to the list of endpoints for a particular
- * range. Mutations for keys in that range are sent to this client via a queue.
- */
- public abstract class AbstractRangeClient<K> extends Thread
- {
- // The list of endpoints for this range
- protected final List<InetAddress> endpoints;
- // A bounded queue of incoming mutations for this range
- protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
-
- protected volatile boolean run = true;
- // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
- // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
- // when the client is closed.
- protected volatile IOException lastException;
-
- protected Cassandra.Client client;
-
- /**
- * Constructs an {@link AbstractRangeClient} for the given endpoints.
- * @param endpoints the possible endpoints to execute the mutations on
- */
- public AbstractRangeClient(List<InetAddress> endpoints)
- {
- super("client-" + endpoints);
- this.endpoints = endpoints;
- }
-
- /**
- * enqueues the given value to Cassandra
- */
- public void put(K value) throws IOException
- {
- while (true)
- {
- if (lastException != null)
- throw lastException;
- try
- {
- if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
- break;
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- }
-
- public void close() throws IOException
- {
- // stop the run loop. this will result in closeInternal being called by the time join() finishes.
- run = false;
- interrupt();
- try
- {
- this.join();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- if (lastException != null)
- throw lastException;
- }
-
- protected void closeInternal()
- {
- if (client != null)
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- }
- }
-
- /**
- * Loops collecting mutations from the queue and sending to Cassandra
- */
- public abstract void run();
-
- @Override
- public String toString()
- {
- return "#<Client for " + endpoints + ">";
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index f5a5a8d..5282279 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,9 +23,12 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.thrift.Mutation;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+ implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
{
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
@@ -39,4 +42,50 @@ public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<M
{
return new BulkRecordWriter(context);
}
+
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d67b856..6b9ecb5 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -17,24 +17,57 @@
*/
package org.apache.cassandra.hadoop;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
-public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+public final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>>
+ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
private File outputDir;
@@ -55,17 +88,32 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
BulkRecordWriter(TaskAttemptContext context)
{
- super(context);
+
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
}
BulkRecordWriter(Configuration conf, Progressable progress)
{
- super(conf, progress);
+ this(conf);
+ this.progress = progress;
}
BulkRecordWriter(Configuration conf)
{
- super(conf);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+ }
+
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
}
private void setTypes(Mutation mutation)
@@ -115,6 +163,54 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
}
@Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ if (writer != null)
+ {
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
+ {
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+ }
+ }
+ }
+
+ @Override
public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
{
setTypes(value.get(0));
@@ -158,4 +254,43 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
HadoopCompat.progress(context);
}
}
+
+ public static class ExternalClient extends NativeSSTableLoaderClient
+ {
+ public ExternalClient(Configuration conf)
+ {
+ super(resolveHostAddresses(conf),
+ CqlConfigHelper.getOutputNativePort(conf),
+ ConfigHelper.getOutputKeyspaceUserName(conf),
+ ConfigHelper.getOutputKeyspacePassword(conf),
+ CqlConfigHelper.getSSLOptions(conf).orNull());
+ }
+
+ private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+ {
+ Set<InetAddress> addresses = new HashSet<>();
+
+ for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
+ {
+ try
+ {
+ addresses.add(InetAddress.getByName(host));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return addresses;
+ }
+ }
+
+ public static class NullOutputHandler implements OutputHandler
+ {
+ public void output(String msg) {}
+ public void debug(String msg) {}
+ public void warn(String msg) {}
+ public void warn(String msg, Throwable th) {}
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 686d486..88dd2e2 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,11 +21,23 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -44,9 +56,40 @@ import org.apache.hadoop.mapreduce.*;
*
* The default split size is 64k rows.
*/
+@Deprecated
public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
{
-
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
+ public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
+ {
+ logger.debug("Creating authenticated client for CF input format");
+ TTransport transport;
+ try
+ {
+ transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
+ }
+ catch (Exception e)
+ {
+ throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
+ }
+ TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+
+ // log in
+ client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
+ if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+ creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+ logger.debug("Authenticated client for CF input format created successfully");
+ return client;
+ }
+
public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 2990bf3..94ced69 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -18,10 +18,18 @@
package org.apache.cassandra.hadoop;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+
+import org.slf4j.*;
+
+import org.apache.cassandra.auth.*;
import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
/**
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
@@ -44,8 +52,93 @@ import org.apache.hadoop.mapreduce.*;
* official by sending a batch mutate request to Cassandra.
* </p>
*/
-public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+ implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
{
+ public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+ public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
+
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
+
+ /**
+ * The OutputCommitter for this format does not write any data to the DFS.
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /**
+ * Check for validity of the output-specification for the job.
+ *
+ * @param context
+ * information about the job
+ */
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ protected void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ /**
+ * Connects to the given server:port and returns a client based on the given socket that points to the configured
+ * keyspace, and is logged in with the configured credentials.
+ *
+ * @param host fully qualified host name to connect to
+ * @param port RPC port of the server
+ * @param conf a job configuration
+ * @return a cassandra client
+ * @throws Exception set of thrown exceptions may be implementation defined,
+ * depending on the used transport factory
+ */
+ public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
+ {
+ logger.debug("Creating authenticated client for CF output format");
+ TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
+ TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+ String user = ConfigHelper.getOutputKeyspaceUserName(conf);
+ String password = ConfigHelper.getOutputKeyspacePassword(conf);
+ if ((user != null) && (password != null))
+ login(user, password, client);
+
+ logger.debug("Authenticated client for CF output format created successfully");
+ return client;
+ }
+
+ public static void login(String user, String password, Cassandra.Client client) throws Exception
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(PasswordAuthenticator.USERNAME_KEY, user);
+ creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
@@ -64,4 +157,26 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B
{
return new ColumnFamilyRecordWriter(context);
}
+
+ /**
+ * An {@link OutputCommitter} that does nothing.
+ */
+ private static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 35437e9..d205f13 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
+@Deprecated
public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index d6a873b..31c7047 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -22,15 +22,19 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.*;
+import org.apache.cassandra.client.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TException;
import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.transport.*;
/**
@@ -47,10 +51,30 @@ import org.apache.hadoop.util.Progressable;
*
* @see ColumnFamilyOutputFormat
*/
-final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements
+ org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
{
+ // The configuration this writer is associated with.
+ protected final Configuration conf;
+
+ // The number of mutations to buffer per endpoint
+ protected final int queueSize;
+
+ protected final long batchThreshold;
+
+ protected final ConsistencyLevel consistencyLevel;
+ protected Progressable progressable;
+ protected TaskAttemptContext context;
// handles for clients for each range running in the threadpool
private final Map<Range, RangeClient> clients;
+
+ // The ring cache that describes the token ranges each node in the ring is
+ // responsible for. This is what allows us to group the mutations by
+ // the endpoints they should be targeted at. The targeted endpoint
+ // essentially
+ // acts as the primary replica for the rows being affected by the mutations.
+ private final RingCache ringCache;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -73,11 +97,33 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
ColumnFamilyRecordWriter(Configuration conf)
{
- super(conf);
+ this.conf = conf;
+ this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+ batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+ consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
+ this.ringCache = new RingCache(conf);
this.clients = new HashMap<Range, RangeClient>();
}
-
- @Override
+
+ /**
+ * Close this <code>RecordWriter</code> to future operations, but not before
+ * flushing out the batched mutations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ */
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
public void close() throws IOException
{
// close all the clients before throwing anything
@@ -138,8 +184,20 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Mutations for keys in that range are sent to this client via a queue.
*/
- public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
+ public class RangeClient extends Thread
{
+ // The list of endpoints for this range
+ protected final List<InetAddress> endpoints;
+ // A bounded queue of incoming mutations for this range
+ protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize);
+
+ protected volatile boolean run = true;
+ // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+ // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+ // when the client is closed.
+ protected volatile IOException lastException;
+
+ protected Cassandra.Client client;
public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
/**
@@ -148,8 +206,58 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
*/
public RangeClient(List<InetAddress> endpoints)
{
- super(endpoints);
+ super("client-" + endpoints);
+ this.endpoints = endpoints;
}
+
+ /**
+ * enqueues the given value to Cassandra
+ */
+ public void put(Pair<ByteBuffer, Mutation> value) throws IOException
+ {
+ while (true)
+ {
+ if (lastException != null)
+ throw lastException;
+ try
+ {
+ if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ public void close() throws IOException
+ {
+ // stop the run loop. this will result in closeInternal being called by the time join() finishes.
+ run = false;
+ interrupt();
+ try
+ {
+ this.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (lastException != null)
+ throw lastException;
+ }
+
+ protected void closeInternal()
+ {
+ if (client != null)
+ {
+ TTransport transport = client.getOutputProtocol().getTransport();
+ if (transport.isOpen())
+ transport.close();
+ }
+ }
/**
* Loops collecting mutations from the queue and sending to Cassandra