You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/03/19 19:44:32 UTC
git commit: Add progress reporting to hadoop output formats. Patch by
brandonwilliams, reviewed by goffinet for CASSANDRA-3859
Updated Branches:
refs/heads/cassandra-1.1.0 438acfc8c -> cd806ddd2
Add progress reporting to hadoop output formats.
Patch by brandonwilliams, reviewed by goffinet for CASSANDRA-3859
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd806ddd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd806ddd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd806ddd
Branch: refs/heads/cassandra-1.1.0
Commit: cd806ddd266d69f83f5a5a12283945a7c9fd4e8d
Parents: 438acfc
Author: Brandon Williams <br...@apache.org>
Authored: Mon Mar 19 13:39:58 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Mar 19 13:39:58 2012 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/BulkOutputFormat.java | 2 +-
.../apache/cassandra/hadoop/BulkRecordWriter.java | 33 ++++++++--
.../cassandra/hadoop/ColumnFamilyOutputFormat.java | 2 +-
.../cassandra/hadoop/ColumnFamilyRecordWriter.java | 9 +++
.../org/apache/cassandra/hadoop/Progressable.java | 50 +++++++++++++++
5 files changed, 88 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index 9902b6b..8fd6f7c 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -67,7 +67,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
@Deprecated
public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
{
- return new BulkRecordWriter(job);
+ return new BulkRecordWriter(job, new Progressable(progress));
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index b247c8d..69be255 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -27,6 +27,8 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.net.UnknownHostException;
import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -57,6 +59,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private SSTableSimpleUnsortedWriter writer;
private SSTableLoader loader;
private File outputdir;
+ private Progressable progress;
private enum CFType
{
@@ -76,6 +79,14 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
BulkRecordWriter(TaskAttemptContext context) throws IOException
{
this(context.getConfiguration());
+ this.progress = new Progressable(context);
+ }
+
+
+ BulkRecordWriter(Configuration conf, Progressable progress) throws IOException
+ {
+ this(conf);
+ this.progress = progress;
}
BulkRecordWriter(Configuration conf) throws IOException
@@ -169,6 +180,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
}
}
+ progress.progress();
}
}
@Override
@@ -189,13 +201,22 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
if (writer != null)
{
writer.close();
- try
+ SSTableLoader.LoaderFuture future = loader.stream();
+ while (true)
{
- loader.stream().get();
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (TimeoutException te)
+ {
+ progress.progress();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index ea2ef11..668c4aa 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -119,7 +119,7 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
@Deprecated @Override
public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
{
- return new ColumnFamilyRecordWriter(job);
+ return new ColumnFamilyRecordWriter(job, new Progressable(progress));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index f6d0ed4..5e36a80 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -78,6 +78,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private final long batchThreshold;
private final ConsistencyLevel consistencyLevel;
+ private Progressable progressable;
/**
@@ -90,6 +91,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
{
this(context.getConfiguration());
+ this.progressable = new Progressable(context);
+ }
+
+ ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) throws IOException
+ {
+ this(conf);
+ this.progressable = progressable;
}
ColumnFamilyRecordWriter(Configuration conf) throws IOException
@@ -133,6 +141,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
for (Mutation amut : value)
client.put(new Pair<ByteBuffer,Mutation>(keybuff, amut));
+ progressable.progress();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/Progressable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/Progressable.java b/src/java/org/apache/cassandra/hadoop/Progressable.java
new file mode 100644
index 0000000..091a828
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/Progressable.java
@@ -0,0 +1,50 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class Progressable
+{
+ private TaskAttemptContext context;
+ private org.apache.hadoop.util.Progressable progressable;
+
+ Progressable(TaskAttemptContext context)
+ {
+ this.context = context;
+ }
+
+ Progressable(org.apache.hadoop.util.Progressable progressable)
+ {
+ this.progressable = progressable;
+ }
+
+ public void progress()
+ {
+ if (context != null)
+ context.progress();
+ else
+ progressable.progress();
+ }
+
+}