You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:22 UTC
[22/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
deleted file mode 100644
index e18b3aa..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-/**
- * A base for {@link MultiTableInputFormat}s. Receives a list of
- * {@link Scan} instances that define the input tables and
- * filters etc. Subclasses may use other TableRecordReader implementations.
- */
-@InterfaceAudience.Public
-public abstract class MultiTableInputFormatBase extends
- InputFormat<ImmutableBytesWritable, Result> {
-
- private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
-
- /** Holds the set of scans used to define the input. */
- private List<Scan> scans;
-
- /** The reader scanning the table, can be a custom one. */
- private TableRecordReader tableRecordReader = null;
-
- /**
- * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
- * default.
- *
- * @param split The split to work with.
- * @param context The current context.
- * @return The newly created record reader.
- * @throws IOException When creating the reader fails.
- * @throws InterruptedException when record reader initialization fails
- * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
- * org.apache.hadoop.mapreduce.InputSplit,
- * org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
- @Override
- public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
- InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- TableSplit tSplit = (TableSplit) split;
- LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
-
- if (tSplit.getTable() == null) {
- throw new IOException("Cannot create a record reader because of a"
- + " previous error. Please look at the previous logs lines from"
- + " the task's full log for more details.");
- }
- final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
- Table table = connection.getTable(tSplit.getTable());
-
- if (this.tableRecordReader == null) {
- this.tableRecordReader = new TableRecordReader();
- }
- final TableRecordReader trr = this.tableRecordReader;
-
- try {
- Scan sc = tSplit.getScan();
- sc.setStartRow(tSplit.getStartRow());
- sc.setStopRow(tSplit.getEndRow());
- trr.setScan(sc);
- trr.setTable(table);
- return new RecordReader<ImmutableBytesWritable, Result>() {
-
- @Override
- public void close() throws IOException {
- trr.close();
- connection.close();
- }
-
- @Override
- public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
- return trr.getCurrentKey();
- }
-
- @Override
- public Result getCurrentValue() throws IOException, InterruptedException {
- return trr.getCurrentValue();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return trr.getProgress();
- }
-
- @Override
- public void initialize(InputSplit inputsplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- trr.initialize(inputsplit, context);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return trr.nextKeyValue();
- }
- };
- } catch (IOException ioe) {
- // If there is an exception make sure that all
- // resources are closed and released.
- trr.close();
- connection.close();
- throw ioe;
- }
- }
-
- /**
- * Calculates the splits that will serve as input for the map tasks. The
- * number of splits matches the number of regions in a table.
- *
- * @param context The current job context.
- * @return The list of input splits.
- * @throws IOException When creating the list of splits fails.
- * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
- */
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException {
- if (scans.isEmpty()) {
- throw new IOException("No scans were provided.");
- }
-
- Map<TableName, List<Scan>> tableMaps = new HashMap<>();
- for (Scan scan : scans) {
- byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
- if (tableNameBytes == null)
- throw new IOException("A scan object did not have a table name");
-
- TableName tableName = TableName.valueOf(tableNameBytes);
-
- List<Scan> scanList = tableMaps.get(tableName);
- if (scanList == null) {
- scanList = new ArrayList<>();
- tableMaps.put(tableName, scanList);
- }
- scanList.add(scan);
- }
-
- List<InputSplit> splits = new ArrayList<>();
- Iterator iter = tableMaps.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
- TableName tableName = entry.getKey();
- List<Scan> scanList = entry.getValue();
-
- try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
- Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
- RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
- regionLocator, conn.getAdmin());
- Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
- for (Scan scan : scanList) {
- if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
- throw new IOException("Expecting at least one region for table : "
- + tableName.getNameAsString());
- }
- int count = 0;
-
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
-
- for (int i = 0; i < keys.getFirst().length; i++) {
- if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
- continue;
- }
-
- if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
- Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
- (stopRow.length == 0 || Bytes.compareTo(stopRow,
- keys.getFirst()[i]) > 0)) {
- byte[] splitStart = startRow.length == 0 ||
- Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
- keys.getFirst()[i] : startRow;
- byte[] splitStop = (stopRow.length == 0 ||
- Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
- keys.getSecond()[i].length > 0 ?
- keys.getSecond()[i] : stopRow;
-
- HRegionLocation hregionLocation = regionLocator.getRegionLocation(
- keys.getFirst()[i], false);
- String regionHostname = hregionLocation.getHostname();
- HRegionInfo regionInfo = hregionLocation.getRegionInfo();
- String encodedRegionName = regionInfo.getEncodedName();
- long regionSize = sizeCalculator.getRegionSize(
- regionInfo.getRegionName());
-
- TableSplit split = new TableSplit(table.getName(),
- scan, splitStart, splitStop, regionHostname,
- encodedRegionName, regionSize);
-
- splits.add(split);
-
- if (LOG.isDebugEnabled())
- LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
- }
- }
- }
- }
- }
-
- return splits;
- }
-
- /**
- * Test if the given region is to be included in the InputSplit while
- * splitting the regions of a table.
- * <p>
- * This optimization is effective when there is a specific reasoning to
- * exclude an entire region from the M-R job, (and hence, not contributing to
- * the InputSplit), given the start and end keys of the same. <br>
- * Useful when we need to remember the last-processed top record and revisit
- * the [last, current) interval for M-R processing, continuously. In addition
- * to reducing InputSplits, reduces the load on the region server as well, due
- * to the ordering of the keys. <br>
- * <br>
- * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
- * (recent) region. <br>
- * Override this method, if you want to bulk exclude regions altogether from
- * M-R. By default, no region is excluded( i.e. all regions are included).
- *
- * @param startKey Start key of the region
- * @param endKey End key of the region
- * @return true, if this region needs to be included as part of the input
- * (default).
- */
- protected boolean includeRegionInSplit(final byte[] startKey,
- final byte[] endKey) {
- return true;
- }
-
- /**
- * Allows subclasses to get the list of {@link Scan} objects.
- */
- protected List<Scan> getScans() {
- return this.scans;
- }
-
- /**
- * Allows subclasses to set the list of {@link Scan} objects.
- *
- * @param scans The list of {@link Scan} used to define the input
- */
- protected void setScans(List<Scan> scans) {
- this.scans = scans;
- }
-
- /**
- * Allows subclasses to set the {@link TableRecordReader}.
- *
- * @param tableRecordReader A different {@link TableRecordReader}
- * implementation.
- */
- protected void setTableRecordReader(TableRecordReader tableRecordReader) {
- this.tableRecordReader = tableRecordReader;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
deleted file mode 100644
index 4cc784f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * <p>
- * Hadoop output format that writes to one or more HBase tables. The key is
- * taken to be the table name while the output value <em>must</em> be either a
- * {@link Put} or a {@link Delete} instance. All tables must already exist, and
- * all Puts and Deletes must reference only valid column families.
- * </p>
- *
- * <p>
- * Write-ahead logging (WAL) for Puts can be disabled by setting
- * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
- * Note that disabling write-ahead logging is only appropriate for jobs where
- * loss of data due to region server failure can be tolerated (for example,
- * because it is easy to rerun a bulk import).
- * </p>
- */
-@InterfaceAudience.Public
-public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
- /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
- public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
- /** Property value to use write-ahead logging */
- public static final boolean WAL_ON = true;
- /** Property value to disable write-ahead logging */
- public static final boolean WAL_OFF = false;
- /**
- * Record writer for outputting to multiple HTables.
- */
- protected static class MultiTableRecordWriter extends
- RecordWriter<ImmutableBytesWritable, Mutation> {
- private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
- Connection connection;
- Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
- Configuration conf;
- boolean useWriteAheadLogging;
-
- /**
- * @param conf
- * HBaseConfiguration to used
- * @param useWriteAheadLogging
- * whether to use write ahead logging. This can be turned off (
- * <tt>false</tt>) to improve performance when bulk loading data.
- */
- public MultiTableRecordWriter(Configuration conf,
- boolean useWriteAheadLogging) throws IOException {
- LOG.debug("Created new MultiTableRecordReader with WAL "
- + (useWriteAheadLogging ? "on" : "off"));
- this.conf = conf;
- this.useWriteAheadLogging = useWriteAheadLogging;
- }
-
- /**
- * @param tableName
- * the name of the table, as a string
- * @return the named mutator
- * @throws IOException
- * if there is a problem opening a table
- */
- BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
- if(this.connection == null){
- this.connection = ConnectionFactory.createConnection(conf);
- }
- if (!mutatorMap.containsKey(tableName)) {
- LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
-
- BufferedMutator mutator =
- connection.getBufferedMutator(TableName.valueOf(tableName.get()));
- mutatorMap.put(tableName, mutator);
- }
- return mutatorMap.get(tableName);
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException {
- for (BufferedMutator mutator : mutatorMap.values()) {
- mutator.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
-
- /**
- * Writes an action (Put or Delete) to the specified table.
- *
- * @param tableName
- * the table being updated.
- * @param action
- * the update, either a put or a delete.
- * @throws IllegalArgumentException
- * if the action is not a put or a delete.
- */
- @Override
- public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
- BufferedMutator mutator = getBufferedMutator(tableName);
- // The actions are not immutable, so we defensively copy them
- if (action instanceof Put) {
- Put put = new Put((Put) action);
- put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
- : Durability.SKIP_WAL);
- mutator.mutate(put);
- } else if (action instanceof Delete) {
- Delete delete = new Delete((Delete) action);
- mutator.mutate(delete);
- } else
- throw new IllegalArgumentException(
- "action must be either Delete or Put");
- }
- }
-
- @Override
- public void checkOutputSpecs(JobContext context) throws IOException,
- InterruptedException {
- // we can't know ahead of time if it's going to blow up when the user
- // passes a table name that doesn't exist, so nothing useful here.
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new TableOutputCommitter();
- }
-
- @Override
- public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
- conf.getBoolean(WAL_PROPERTY, WAL_ON));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
deleted file mode 100644
index 0f07a58..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * MultiTableSnapshotInputFormat generalizes
- * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
- * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
- * configured for each.
- * Internally, the input format delegates to
- * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
- * and thus has the same performance advantages;
- * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
- * more details.
- * Usage is similar to TableSnapshotInputFormat, with the following exception:
- * initMultiTableSnapshotMapperJob takes in a map
- * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
- * scan will be applied;
- * the overall dataset for the job is defined by the concatenation of the regions and tables
- * included in each snapshot/scan
- * pair.
- * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
- * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
- * .hadoop.fs.Path)}
- * can be used to configure the job.
- * <pre>{@code
- * Job job = new Job(conf);
- * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
- * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
- * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
- * );
- * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
- * TableMapReduceUtil.initTableSnapshotMapperJob(
- * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
- * MyMapOutputValueWritable.class, job, true, restoreDir);
- * }
- * </pre>
- * Internally, this input format restores each snapshot into a subdirectory of the given tmp
- * directory. Input splits and
- * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
- * .TableSnapshotInputFormat}
- * (one per region).
- * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
- * permissioning; the
- * same caveats apply here.
- *
- * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
- * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
- */
-@InterfaceAudience.Public
-public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
-
- private final MultiTableSnapshotInputFormatImpl delegate;
-
- public MultiTableSnapshotInputFormat() {
- this.delegate = new MultiTableSnapshotInputFormatImpl();
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext jobContext)
- throws IOException, InterruptedException {
- List<TableSnapshotInputFormatImpl.InputSplit> splits =
- delegate.getSplits(jobContext.getConfiguration());
- List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
-
- for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
- rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
- }
-
- return rtn;
- }
-
- public static void setInput(Configuration configuration,
- Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
- new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
deleted file mode 100644
index 4331c0f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.ConfigurationUtil;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Shared implementation of mapreduce code over multiple table snapshots.
- * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
- * .MultiTableSnapshotInputFormat} and mapred
- * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
- */
-@InterfaceAudience.LimitedPrivate({ "HBase" })
-@InterfaceStability.Evolving
-public class MultiTableSnapshotInputFormatImpl {
-
- private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
-
- public static final String RESTORE_DIRS_KEY =
- "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
- public static final String SNAPSHOT_TO_SCANS_KEY =
- "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
-
- /**
- * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
- * restoreDir.
- * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
- *
- * @param conf
- * @param snapshotScans
- * @param restoreDir
- * @throws IOException
- */
- public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
- Path restoreDir) throws IOException {
- Path rootDir = FSUtils.getRootDir(conf);
- FileSystem fs = rootDir.getFileSystem(conf);
-
- setSnapshotToScans(conf, snapshotScans);
- Map<String, Path> restoreDirs =
- generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
- setSnapshotDirs(conf, restoreDirs);
- restoreSnapshots(conf, restoreDirs, fs);
- }
-
- /**
- * Return the list of splits extracted from the scans/snapshots pushed to conf by
- * {@link
- * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
- *
- * @param conf Configuration to determine splits from
- * @return Return the list of splits extracted from the scans/snapshots pushed to conf
- * @throws IOException
- */
- public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
- throws IOException {
- Path rootDir = FSUtils.getRootDir(conf);
- FileSystem fs = rootDir.getFileSystem(conf);
-
- List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
-
- Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
- Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
- for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
- String snapshotName = entry.getKey();
-
- Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
-
- SnapshotManifest manifest =
- TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
- List<HRegionInfo> regionInfos =
- TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
-
- for (Scan scan : entry.getValue()) {
- List<TableSnapshotInputFormatImpl.InputSplit> splits =
- TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
- rtn.addAll(splits);
- }
- }
- return rtn;
- }
-
- /**
- * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by
- * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
- *
- * @param conf Configuration to extract name -> list<scan> mappings from.
- * @return the snapshot name -> list<scan> mapping pushed to configuration
- * @throws IOException
- */
- public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
-
- Map<String, Collection<Scan>> rtn = Maps.newHashMap();
-
- for (Map.Entry<String, String> entry : ConfigurationUtil
- .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
- String snapshotName = entry.getKey();
- String scan = entry.getValue();
-
- Collection<Scan> snapshotScans = rtn.get(snapshotName);
- if (snapshotScans == null) {
- snapshotScans = Lists.newArrayList();
- rtn.put(snapshotName, snapshotScans);
- }
-
- snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
- }
-
- return rtn;
- }
-
- /**
- * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
- *
- * @param conf
- * @param snapshotScans
- * @throws IOException
- */
- public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
- throws IOException {
- // flatten out snapshotScans for serialization to the job conf
- List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
-
- for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
- String snapshotName = entry.getKey();
- Collection<Scan> scans = entry.getValue();
-
- // serialize all scans and map them to the appropriate snapshot
- for (Scan scan : scans) {
- snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
- TableMapReduceUtil.convertScanToString(scan)));
- }
- }
-
- ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
- }
-
- /**
- * Retrieve the directories into which snapshots have been restored from
- * ({@link #RESTORE_DIRS_KEY})
- *
- * @param conf Configuration to extract restore directories from
- * @return the directories into which snapshots have been restored from
- * @throws IOException
- */
- public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
- List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
- Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
-
- for (Map.Entry<String, String> kvp : kvps) {
- rtn.put(kvp.getKey(), new Path(kvp.getValue()));
- }
-
- return rtn;
- }
-
- public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
- Map<String, String> toSet = Maps.newHashMap();
-
- for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
- toSet.put(entry.getKey(), entry.getValue().toString());
- }
-
- ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
- }
-
- /**
- * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
- * return a map from the snapshot to the restore directory.
- *
- * @param snapshots collection of snapshot names to restore
- * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
- * @return a mapping from snapshot name to the directory in which that snapshot has been restored
- */
- private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
- Path baseRestoreDir) {
- Map<String, Path> rtn = Maps.newHashMap();
-
- for (String snapshotName : snapshots) {
- Path restoreSnapshotDir =
- new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
- rtn.put(snapshotName, restoreSnapshotDir);
- }
-
- return rtn;
- }
-
- /**
- * Restore each (snapshot name, restore directory) pair in snapshotToDir
- *
- * @param conf configuration to restore with
- * @param snapshotToDir mapping from snapshot names to restore directories
- * @param fs filesystem to do snapshot restoration on
- * @throws IOException
- */
- public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
- throws IOException {
- // TODO: restore from record readers to parallelize.
- Path rootDir = FSUtils.getRootDir(conf);
-
- for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
- String snapshotName = entry.getKey();
- Path restoreDir = entry.getValue();
- LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
- + " for MultiTableSnapshotInputFormat");
- restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
- }
- }
-
- void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
- FileSystem fs) throws IOException {
- RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
deleted file mode 100644
index d1dba1d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-/**
- * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
- * <p>
- * It can be used instead when the Map operation is not CPU
- * bound in order to improve throughput.
- * <p>
- * Mapper implementations using this MapRunnable must be thread-safe.
- * <p>
- * The Map-Reduce job has to be configured with the mapper to use via
- * {@link #setMapperClass} and the number of thread the thread-pool can use with the
- * {@link #getNumberOfThreads} method. The default value is 10 threads.
- * <p>
- */
-
-public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
- private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
- private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
- private Context outer;
- private ExecutorService executor;
- public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
- public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
-
- /**
- * The number of threads in the thread pool that will run the map function.
- * @param job the job
- * @return the number of threads
- */
- public static int getNumberOfThreads(JobContext job) {
- return job.getConfiguration().
- getInt(NUMBER_OF_THREADS, 10);
- }
-
- /**
- * Set the number of threads in the pool for running maps.
- * @param job the job to modify
- * @param threads the new number of threads
- */
- public static void setNumberOfThreads(Job job, int threads) {
- job.getConfiguration().setInt(NUMBER_OF_THREADS,
- threads);
- }
-
- /**
- * Get the application's mapper class.
- * @param <K2> the map's output key type
- * @param <V2> the map's output value type
- * @param job the job
- * @return the mapper class to run
- */
- @SuppressWarnings("unchecked")
- public static <K2,V2>
- Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
- return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
- job.getConfiguration().getClass( MAPPER_CLASS,
- Mapper.class);
- }
-
- /**
- * Set the application's mapper class.
- * @param <K2> the map output key type
- * @param <V2> the map output value type
- * @param job the job to modify
- * @param cls the class to use as the mapper
- */
- public static <K2,V2>
- void setMapperClass(Job job,
- Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
- if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
- throw new IllegalArgumentException("Can't have recursive " +
- "MultithreadedTableMapper instances.");
- }
- job.getConfiguration().setClass(MAPPER_CLASS,
- cls, Mapper.class);
- }
-
- /**
- * Run the application's maps using a thread pool.
- */
- @Override
- public void run(Context context) throws IOException, InterruptedException {
- outer = context;
- int numberOfThreads = getNumberOfThreads(context);
- mapClass = getMapperClass(context);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Configuring multithread runner to use " + numberOfThreads +
- " threads");
- }
- executor = Executors.newFixedThreadPool(numberOfThreads);
- for(int i=0; i < numberOfThreads; ++i) {
- MapRunner thread = new MapRunner(context);
- executor.execute(thread);
- }
- executor.shutdown();
- while (!executor.isTerminated()) {
- // wait till all the threads are done
- Thread.sleep(1000);
- }
- }
-
- private class SubMapRecordReader
- extends RecordReader<ImmutableBytesWritable, Result> {
- private ImmutableBytesWritable key;
- private Result value;
- private Configuration conf;
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public void initialize(InputSplit split,
- TaskAttemptContext context
- ) throws IOException, InterruptedException {
- conf = context.getConfiguration();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- synchronized (outer) {
- if (!outer.nextKeyValue()) {
- return false;
- }
- key = ReflectionUtils.copy(outer.getConfiguration(),
- outer.getCurrentKey(), key);
- value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
- return true;
- }
- }
-
- public ImmutableBytesWritable getCurrentKey() {
- return key;
- }
-
- @Override
- public Result getCurrentValue() {
- return value;
- }
- }
-
- private class SubMapRecordWriter extends RecordWriter<K2,V2> {
-
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- }
-
- @Override
- public void write(K2 key, V2 value) throws IOException,
- InterruptedException {
- synchronized (outer) {
- outer.write(key, value);
- }
- }
- }
-
- private class SubMapStatusReporter extends StatusReporter {
-
- @Override
- public Counter getCounter(Enum<?> name) {
- return outer.getCounter(name);
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return outer.getCounter(group, name);
- }
-
- @Override
- public void progress() {
- outer.progress();
- }
-
- @Override
- public void setStatus(String status) {
- outer.setStatus(status);
- }
-
- public float getProgress() {
- return 0;
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
- justification="Don't understand why FB is complaining about this one. We do throw exception")
- private class MapRunner implements Runnable {
- private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
- private Context subcontext;
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- MapRunner(Context context) throws IOException, InterruptedException {
- mapper = ReflectionUtils.newInstance(mapClass,
- context.getConfiguration());
- try {
- Constructor c = context.getClass().getConstructor(
- Mapper.class,
- Configuration.class,
- TaskAttemptID.class,
- RecordReader.class,
- RecordWriter.class,
- OutputCommitter.class,
- StatusReporter.class,
- InputSplit.class);
- c.setAccessible(true);
- subcontext = (Context) c.newInstance(
- mapper,
- outer.getConfiguration(),
- outer.getTaskAttemptID(),
- new SubMapRecordReader(),
- new SubMapRecordWriter(),
- context.getOutputCommitter(),
- new SubMapStatusReporter(),
- outer.getInputSplit());
- } catch (Exception e) {
- try {
- Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
- Configuration.class,
- TaskAttemptID.class,
- RecordReader.class,
- RecordWriter.class,
- OutputCommitter.class,
- StatusReporter.class,
- InputSplit.class);
- c.setAccessible(true);
- MapContext mc = (MapContext) c.newInstance(
- outer.getConfiguration(),
- outer.getTaskAttemptID(),
- new SubMapRecordReader(),
- new SubMapRecordWriter(),
- context.getOutputCommitter(),
- new SubMapStatusReporter(),
- outer.getInputSplit());
- Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
- Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
- subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
- } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
- // rethrow as IOE
- throw new IOException(e);
- }
- }
- }
-
- @Override
- public void run() {
- try {
- mapper.run(subcontext);
- } catch (Throwable ie) {
- LOG.error("Problem in running map.", ie);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
deleted file mode 100644
index 8997da9..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-@InterfaceAudience.Public
-public class MutationSerialization implements Serialization<Mutation> {
- @Override
- public boolean accept(Class<?> c) {
- return Mutation.class.isAssignableFrom(c);
- }
-
- @Override
- public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
- return new MutationDeserializer();
- }
-
- @Override
- public Serializer<Mutation> getSerializer(Class<Mutation> c) {
- return new MutationSerializer();
- }
-
- private static class MutationDeserializer implements Deserializer<Mutation> {
- private InputStream in;
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- @Override
- public Mutation deserialize(Mutation mutation) throws IOException {
- MutationProto proto = MutationProto.parseDelimitedFrom(in);
- return ProtobufUtil.toMutation(proto);
- }
-
- @Override
- public void open(InputStream in) throws IOException {
- this.in = in;
- }
-
- }
- private static class MutationSerializer implements Serializer<Mutation> {
- private OutputStream out;
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-
- @Override
- public void open(OutputStream out) throws IOException {
- this.out = out;
- }
-
- @Override
- public void serialize(Mutation mutation) throws IOException {
- MutationType type;
- if (mutation instanceof Put) {
- type = MutationType.PUT;
- } else if (mutation instanceof Delete) {
- type = MutationType.DELETE;
- } else {
- throw new IllegalArgumentException("Only Put and Delete are supported");
- }
- ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
deleted file mode 100644
index f01e84f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * Combine Puts. Merges Put instances grouped by <code>K</code> into a single
- * instance.
- * @see TableMapReduceUtil
- */
-@InterfaceAudience.Public
-public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
- private static final Log LOG = LogFactory.getLog(PutCombiner.class);
-
- @Override
- protected void reduce(K row, Iterable<Put> vals, Context context)
- throws IOException, InterruptedException {
- // Using HeapSize to create an upper bound on the memory size of
- // the puts and flush some portion of the content while looping. This
- // flush could result in multiple Puts for a single rowkey. That is
- // acceptable because Combiner is run as an optimization and it's not
- // critical that all Puts are grouped perfectly.
- long threshold = context.getConfiguration().getLong(
- "putcombiner.row.threshold", 1L * (1<<30));
- int cnt = 0;
- long curSize = 0;
- Put put = null;
- Map<byte[], List<Cell>> familyMap = null;
- for (Put p : vals) {
- cnt++;
- if (put == null) {
- put = p;
- familyMap = put.getFamilyCellMap();
- } else {
- for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
- .entrySet()) {
- List<Cell> cells = familyMap.get(entry.getKey());
- List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
- for (Cell cell : entry.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- curSize += kv.heapSize();
- if (kvs != null) {
- kvs.add(kv);
- }
- }
- if (cells == null) {
- familyMap.put(entry.getKey(), entry.getValue());
- }
- }
- if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
- if (curSize > threshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
- }
- context.write(row, put);
- put = null;
- curSize = 0;
- cnt = 0;
- }
- }
- }
- if (put != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
- }
- context.write(row, put);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
deleted file mode 100644
index 17ab9cb..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.TagUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.security.visibility.CellVisibility;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Emits sorted Puts.
- * Reads in all Puts from passed Iterator, sorts them, then emits
- * Puts in sorted order. If lots of columns per row, it will use lots of
- * memory sorting.
- * @see HFileOutputFormat2
- * @see KeyValueSortReducer
- */
-@InterfaceAudience.Public
-public class PutSortReducer extends
- Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
- // the cell creator
- private CellCreator kvCreator;
-
- @Override
- protected void
- setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- this.kvCreator = new CellCreator(conf);
- }
-
- @Override
- protected void reduce(
- ImmutableBytesWritable row,
- java.lang.Iterable<Put> puts,
- Reducer<ImmutableBytesWritable, Put,
- ImmutableBytesWritable, KeyValue>.Context context)
- throws java.io.IOException, InterruptedException
- {
- // although reduce() is called per-row, handle pathological case
- long threshold = context.getConfiguration().getLong(
- "putsortreducer.row.threshold", 1L * (1<<30));
- Iterator<Put> iter = puts.iterator();
- while (iter.hasNext()) {
- TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
- long curSize = 0;
- // stop at the end or the RAM threshold
- List<Tag> tags = new ArrayList<>();
- while (iter.hasNext() && curSize < threshold) {
- // clear the tags
- tags.clear();
- Put p = iter.next();
- long t = p.getTTL();
- if (t != Long.MAX_VALUE) {
- // add TTL tag if found
- tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
- }
- byte[] acl = p.getACL();
- if (acl != null) {
- // add ACL tag if found
- tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
- }
- try {
- CellVisibility cellVisibility = p.getCellVisibility();
- if (cellVisibility != null) {
- // add the visibility labels if any
- tags.addAll(kvCreator.getVisibilityExpressionResolver()
- .createVisibilityExpTags(cellVisibility.getExpression()));
- }
- } catch (DeserializationException e) {
- // We just throw exception here. Should we allow other mutations to proceed by
- // just ignoring the bad one?
- throw new IOException("Invalid visibility expression found in mutation " + p, e);
- }
- for (List<Cell> cells: p.getFamilyCellMap().values()) {
- for (Cell cell: cells) {
- // Creating the KV which needs to be directly written to HFiles. Using the Facade
- // KVCreator for creation of kvs.
- KeyValue kv = null;
- TagUtil.carryForwardTags(tags, cell);
- if (!tags.isEmpty()) {
- kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
- cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
- cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
- cell.getValueOffset(), cell.getValueLength(), tags);
- } else {
- kv = KeyValueUtil.ensureKeyValue(cell);
- }
- if (map.add(kv)) {// don't count duplicated kv into size
- curSize += kv.heapSize();
- }
- }
- }
- }
- context.setStatus("Read " + map.size() + " entries of " + map.getClass()
- + "(" + StringUtils.humanReadableInt(curSize) + ")");
- int index = 0;
- for (KeyValue kv : map) {
- context.write(row, kv);
- if (++index % 100 == 0)
- context.setStatus("Wrote " + index);
- }
-
- // if we have more entries to process
- if (iter.hasNext()) {
- // force flush because we cannot guarantee intra-row sorted order
- context.write(null, null);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
deleted file mode 100644
index dff04b6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-@InterfaceAudience.Public
-public class ResultSerialization extends Configured implements Serialization<Result> {
- private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
- // The following configuration property indicates import file format version.
- public static final String IMPORT_FORMAT_VER = "hbase.import.version";
-
- @Override
- public boolean accept(Class<?> c) {
- return Result.class.isAssignableFrom(c);
- }
-
- @Override
- public Deserializer<Result> getDeserializer(Class<Result> c) {
- // check input format version
- Configuration conf = getConf();
- if (conf != null) {
- String inputVersion = conf.get(IMPORT_FORMAT_VER);
- if (inputVersion != null && inputVersion.equals("0.94")) {
- LOG.info("Load exported file using deserializer for HBase 0.94 format");
- return new Result94Deserializer();
- }
- }
-
- return new ResultDeserializer();
- }
-
- @Override
- public Serializer<Result> getSerializer(Class<Result> c) {
- return new ResultSerializer();
- }
-
- /**
- * The following deserializer class is used to load exported file of 0.94
- */
- private static class Result94Deserializer implements Deserializer<Result> {
- private DataInputStream in;
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- @Override
- public Result deserialize(Result mutation) throws IOException {
- int totalBuffer = in.readInt();
- if (totalBuffer == 0) {
- return Result.EMPTY_RESULT;
- }
- byte[] buf = new byte[totalBuffer];
- readChunked(in, buf, 0, totalBuffer);
- List<Cell> kvs = new ArrayList<>();
- int offset = 0;
- while (offset < totalBuffer) {
- int keyLength = Bytes.toInt(buf, offset);
- offset += Bytes.SIZEOF_INT;
- kvs.add(new KeyValue(buf, offset, keyLength));
- offset += keyLength;
- }
- return Result.create(kvs);
- }
-
- @Override
- public void open(InputStream in) throws IOException {
- if (!(in instanceof DataInputStream)) {
- throw new IOException("Wrong input stream instance passed in");
- }
- this.in = (DataInputStream) in;
- }
-
- private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
- int maxRead = 8192;
-
- for (; ofs < len; ofs += maxRead)
- in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
- }
- }
-
- private static class ResultDeserializer implements Deserializer<Result> {
- private InputStream in;
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- @Override
- public Result deserialize(Result mutation) throws IOException {
- ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
- return ProtobufUtil.toResult(proto);
- }
-
- @Override
- public void open(InputStream in) throws IOException {
- this.in = in;
- }
- }
-
- private static class ResultSerializer implements Serializer<Result> {
- private OutputStream out;
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-
- @Override
- public void open(OutputStream out) throws IOException {
- this.out = out;
- }
-
- @Override
- public void serialize(Result result) throws IOException {
- ProtobufUtil.toResult(result).writeDelimitedTo(out);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
deleted file mode 100644
index 2e0591e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A job with a just a map phase to count rows. Map outputs table rows IF the
- * input row has columns that have content.
- */
-@InterfaceAudience.Public
-public class RowCounter extends Configured implements Tool {
-
- private static final Log LOG = LogFactory.getLog(RowCounter.class);
-
- /** Name of this 'program'. */
- static final String NAME = "rowcounter";
-
- private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
- private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count";
-
- /**
- * Mapper that runs the count.
- */
- static class RowCounterMapper
- extends TableMapper<ImmutableBytesWritable, Result> {
-
- /** Counter enumeration to count the actual rows. */
- public static enum Counters {ROWS}
-
- /**
- * Maps the data.
- *
- * @param row The current table row key.
- * @param values The columns.
- * @param context The current context.
- * @throws IOException When something is broken with the data.
- * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context)
- */
- @Override
- public void map(ImmutableBytesWritable row, Result values,
- Context context)
- throws IOException {
- // Count every row containing data, whether it's in qualifiers or values
- context.getCounter(Counters.ROWS).increment(1);
- }
- }
-
- /**
- * Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws IOException When setting up the job fails.
- */
- public static Job createSubmittableJob(Configuration conf, String[] args)
- throws IOException {
- String tableName = args[0];
- List<MultiRowRangeFilter.RowRange> rowRangeList = null;
- long startTime = 0;
- long endTime = 0;
-
- StringBuilder sb = new StringBuilder();
-
- final String rangeSwitch = "--range=";
- final String startTimeArgKey = "--starttime=";
- final String endTimeArgKey = "--endtime=";
- final String expectedCountArg = "--expected-count=";
-
- // First argument is table name, starting from second
- for (int i = 1; i < args.length; i++) {
- if (args[i].startsWith(rangeSwitch)) {
- try {
- rowRangeList = parseRowRangeParameter(args[i], rangeSwitch);
- } catch (IllegalArgumentException e) {
- return null;
- }
- continue;
- }
- if (args[i].startsWith(startTimeArgKey)) {
- startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
- continue;
- }
- if (args[i].startsWith(endTimeArgKey)) {
- endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
- continue;
- }
- if (args[i].startsWith(expectedCountArg)) {
- conf.setLong(EXPECTED_COUNT_KEY,
- Long.parseLong(args[i].substring(expectedCountArg.length())));
- continue;
- }
- // if no switch, assume column names
- sb.append(args[i]);
- sb.append(" ");
- }
- if (endTime < startTime) {
- printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
- return null;
- }
-
- Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
- job.setJarByClass(RowCounter.class);
- Scan scan = new Scan();
- scan.setCacheBlocks(false);
- setScanFilter(scan, rowRangeList);
- if (sb.length() > 0) {
- for (String columnName : sb.toString().trim().split(" ")) {
- String family = StringUtils.substringBefore(columnName, ":");
- String qualifier = StringUtils.substringAfter(columnName, ":");
-
- if (StringUtils.isBlank(qualifier)) {
- scan.addFamily(Bytes.toBytes(family));
- }
- else {
- scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
- }
- }
- }
- scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
- job.setOutputFormatClass(NullOutputFormat.class);
- TableMapReduceUtil.initTableMapperJob(tableName, scan,
- RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
- job.setNumReduceTasks(0);
- return job;
- }
-
- private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(
- String arg, String rangeSwitch) {
- final String[] ranges = arg.substring(rangeSwitch.length()).split(";");
- final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
- for (String range : ranges) {
- String[] startEnd = range.split(",", 2);
- if (startEnd.length != 2 || startEnd[1].contains(",")) {
- printUsage("Please specify range in such format as \"--range=a,b\" " +
- "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
- throw new IllegalArgumentException("Wrong range specification: " + range);
- }
- String startKey = startEnd[0];
- String endKey = startEnd[1];
- rangeList.add(new MultiRowRangeFilter.RowRange(
- Bytes.toBytesBinary(startKey), true,
- Bytes.toBytesBinary(endKey), false));
- }
- return rangeList;
- }
-
- /**
- * Sets filter {@link FilterBase} to the {@link Scan} instance.
- * If provided rowRangeList contains more than one element,
- * method sets filter which is instance of {@link MultiRowRangeFilter}.
- * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
- * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
- * @param scan
- * @param rowRangeList
- */
- private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
- final int size = rowRangeList == null ? 0 : rowRangeList.size();
- if (size <= 1) {
- scan.setFilter(new FirstKeyOnlyFilter());
- }
- if (size == 1) {
- MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
- scan.setStartRow(range.getStartRow()); //inclusive
- scan.setStopRow(range.getStopRow()); //exclusive
- } else if (size > 1) {
- scan.setFilter(new MultiRowRangeFilter(rowRangeList));
- }
- }
-
- /*
- * @param errorMessage Can attach a message when error occurs.
- */
- private static void printUsage(String errorMessage) {
- System.err.println("ERROR: " + errorMessage);
- printUsage();
- }
-
- /**
- * Prints usage without error message.
- * Note that we don't document --expected-count, because it's intended for test.
- */
- private static void printUsage() {
- System.err.println("Usage: RowCounter [options] <tablename> " +
- "[--starttime=[start] --endtime=[end] " +
- "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
- System.err.println("For performance consider the following options:\n"
- + "-Dhbase.client.scanner.caching=100\n"
- + "-Dmapreduce.map.speculative=false");
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 1) {
- printUsage("Wrong number of parameters: " + args.length);
- return -1;
- }
- Job job = createSubmittableJob(getConf(), args);
- if (job == null) {
- return -1;
- }
- boolean success = job.waitForCompletion(true);
- final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1);
- if (success && expectedCount != -1) {
- final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
- success = expectedCount == counter.getValue();
- if (!success) {
- LOG.error("Failing job because count of '" + counter.getValue() +
- "' does not match expected count of '" + expectedCount + "'");
- }
- }
- return (success ? 0 : 1);
- }
-
- /**
- * Main entry point.
- * @param args The command line parameters.
- * @throws Exception When running the job fails.
- */
- public static void main(String[] args) throws Exception {
- int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args);
- System.exit(errCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
deleted file mode 100644
index 4ba1088..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * A partitioner that takes start and end keys and uses bigdecimal to figure
- * which reduce a key belongs to. Pass the start and end
- * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
- * and <code>hbase.simpletotalorder.end</code>. The end key needs to be
- * exclusive; i.e. one larger than the biggest key in your key space.
- * You may be surprised at how this class partitions the space; it may not
- * align with preconceptions; e.g. a start key of zero and an end key of 100
- * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
- * Make your own partitioner if you need the region spacing to come out a
- * particular way.
- * @param <VALUE>
- * @see #START
- * @see #END
- */
-@InterfaceAudience.Public
-public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
-implements Configurable {
- private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
-
- @Deprecated
- public static final String START = "hbase.simpletotalorder.start";
- @Deprecated
- public static final String END = "hbase.simpletotalorder.end";
-
- static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
- static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
-
- private Configuration c;
- private byte [] startkey;
- private byte [] endkey;
- private byte [][] splits;
- private int lastReduces = -1;
-
- public static void setStartKey(Configuration conf, byte[] startKey) {
- conf.set(START_BASE64, Base64.encodeBytes(startKey));
- }
-
- public static void setEndKey(Configuration conf, byte[] endKey) {
- conf.set(END_BASE64, Base64.encodeBytes(endKey));
- }
-
- @SuppressWarnings("deprecation")
- static byte[] getStartKey(Configuration conf) {
- return getKeyFromConf(conf, START_BASE64, START);
- }
-
- @SuppressWarnings("deprecation")
- static byte[] getEndKey(Configuration conf) {
- return getKeyFromConf(conf, END_BASE64, END);
- }
-
- private static byte[] getKeyFromConf(Configuration conf,
- String base64Key, String deprecatedKey) {
- String encoded = conf.get(base64Key);
- if (encoded != null) {
- return Base64.decode(encoded);
- }
- String oldStyleVal = conf.get(deprecatedKey);
- if (oldStyleVal == null) {
- return null;
- }
- LOG.warn("Using deprecated configuration " + deprecatedKey +
- " - please use static accessor methods instead.");
- return Bytes.toBytesBinary(oldStyleVal);
- }
-
- @Override
- public int getPartition(final ImmutableBytesWritable key, final VALUE value,
- final int reduces) {
- if (reduces == 1) return 0;
- if (this.lastReduces != reduces) {
- this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
- for (int i = 0; i < splits.length; i++) {
- LOG.info(Bytes.toStringBinary(splits[i]));
- }
- this.lastReduces = reduces;
- }
- int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
- key.getLength());
- // Below code is from hfile index search.
- if (pos < 0) {
- pos++;
- pos *= -1;
- if (pos == 0) {
- // falls before the beginning of the file.
- throw new RuntimeException("Key outside start/stop range: " +
- key.toString());
- }
- pos--;
- }
- return pos;
- }
-
- @Override
- public Configuration getConf() {
- return this.c;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.c = conf;
- this.startkey = getStartKey(conf);
- this.endkey = getEndKey(conf);
- if (startkey == null || endkey == null) {
- throw new RuntimeException(this.getClass() + " not configured");
- }
- LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
- ", endkey=" + Bytes.toStringBinary(endkey));
- // Reset last reduces count on change of Start / End key
- this.lastReduces = -1;
- }
-}