You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by dk...@apache.org on 2016/08/05 23:27:33 UTC
[3/9] incubator-atlas git commit: ATLAS-693 Titan 0.5.4
implementation of graph db abstraction. (jnhagelb via dkantor)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
deleted file mode 100644
index 2c0d6fe..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.util.Arrays;
-
-import org.apache.hadoop.hbase.util.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HBaseCompatLoader {
-
- private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
-
- private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
-
- private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
-
- private static HBaseCompat cachedCompat;
-
- public synchronized static HBaseCompat getCompat(String classOverride) {
-
- if (null != cachedCompat) {
- log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
- return cachedCompat;
- }
-
- HBaseCompat compat;
- String className = null;
- String classNameSource = null;
-
- if (null != classOverride) {
- className = classOverride;
- classNameSource = "from explicit configuration";
- } else {
- String hbaseVersion = VersionInfo.getVersion();
- for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
- if (hbaseVersion.startsWith(supportedVersion + ".")) {
- className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
- classNameSource = "supporting runtime HBase version " + hbaseVersion;
- break;
- }
- }
- if (null == className) {
- log.info("The HBase version {} is not explicitly supported by Titan. " +
- "Loading Titan's compatibility layer for its most recent supported HBase version ({})",
- hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
- className = DEFAULT_HBASE_CLASS_NAME;
- classNameSource = " by default";
- }
- }
-
- final String errTemplate = " when instantiating HBase compatibility class " + className;
-
- try {
- compat = (HBaseCompat)Class.forName(className).newInstance();
- log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- } catch (InstantiationException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- }
-
- return cachedCompat = compat;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
deleted file mode 100644
index c5f6e0d..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.thinkaurelius.titan.core.attribute.Duration;
-import com.thinkaurelius.titan.diskstorage.*;
-import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
-import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
-import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
-import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
-import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
-import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
-import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
-import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
-import com.thinkaurelius.titan.util.system.IOUtils;
-
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
-import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Here are some areas that might need work:
- * <p/>
- * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
- * - tuning HTable#setWriteBufferSize (?)
- * - writing a server-side filter to replace ColumnCountGetFilter, which drops
- * all columns on the row where it reaches its limit. This requires getSlice,
- * currently, to impose its limit on the client side. That obviously won't
- * scale.
- * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
- * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
- * <p/>
- * There may be other problem areas. These are just the ones of which I'm aware.
- */
-public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
-
- private final String tableName;
- private final HBaseStoreManager storeManager;
-
- // When using shortened CF names, columnFamily is the shortname and storeName is the longname
- // When not using shortened CF names, they are the same
- //private final String columnFamily;
- private final String storeName;
- // This is columnFamily.getBytes()
- private final byte[] columnFamilyBytes;
- private final HBaseGetter entryGetter;
-
- private final ConnectionMask cnx;
-
- private LocalLockMediator<StoreTransaction> localLockMediator;
-
- private final Duration lockExpiryTimeMs;
- private final Duration lockMaxWaitTimeMs;
- private final Integer lockMaxRetries;
-
- HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
- this.storeManager = storeManager;
- this.cnx = cnx;
- this.tableName = tableName;
- //this.columnFamily = columnFamily;
- this.storeName = storeName;
- this.columnFamilyBytes = columnFamily.getBytes();
- this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
- this.localLockMediator = llm;
- Configuration storageConfig = storeManager.getStorageConfig();
- this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE);
- this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT);
- this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY);
- }
-
- @Override
- public void close() throws BackendException {
- }
-
- @Override
- public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
- Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
- return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
- }
-
- @Override
- public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
- return getHelper(keys, getFilter(query));
- }
-
- @Override
- public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
- Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
- mutateMany(mutations, txh);
- }
-
- @Override
- public void acquireLock(StaticBuffer key,
- StaticBuffer column,
- StaticBuffer expectedValue,
- StoreTransaction txh) throws BackendException {
-
- KeyColumn lockID = new KeyColumn(key, column);
- logger.debug("Attempting to acquireLock on {} ", lockID);
- int trialCount = 0;
- boolean locked;
- while (trialCount < lockMaxRetries) {
- final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs));
- trialCount++;
- if (!locked) {
- handleLockFailure(txh, lockID, trialCount);
- } else {
- logger.debug("Acquired lock on {}, {}", lockID, txh);
- break;
- }
- }
- ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
- }
-
- void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException {
- if (trialCount < lockMaxRetries) {
- try {
- Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS));
- } catch (InterruptedException e) {
- throw new PermanentLockingException(
- "Interrupted while waiting for acquiring lock for transaction "
- + txh + " lockID " + lockID + " on retry " + trialCount, e);
- }
- } else {
- throw new PermanentLockingException("Could not lock the keyColumn " +
- lockID + " on CF {} " + Bytes.toString(columnFamilyBytes));
- }
- }
-
- @Override
- public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
- return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
- query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
- new FilterList(FilterList.Operator.MUST_PASS_ALL),
- query);
- }
-
- @Override
- public String getName() {
- return storeName;
- }
-
- @Override
- public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
- return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
- }
-
- public static Filter getFilter(SliceQuery query) {
- byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
- byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
-
- Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
-
- if (query.hasLimit()) {
- filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
- filter,
- new ColumnPaginationFilter(query.getLimit(), 0));
- }
-
- logger.debug("Generated HBase Filter {}", filter);
-
- return filter;
- }
-
- private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
- List<Get> requests = new ArrayList<Get>(keys.size());
- {
- for (StaticBuffer key : keys) {
- Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
- try {
- g.setTimeRange(0, Long.MAX_VALUE);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
- requests.add(g);
- }
- }
-
- Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
-
- try {
- TableMask table = null;
- Result[] results = null;
-
- try {
- table = cnx.getTable(tableName);
- logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
- results = table.get(requests);
- logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
- } finally {
- IOUtils.closeQuietly(table);
- }
-
- if (results == null)
- return KCVSUtil.emptyResults(keys);
-
- assert results.length==keys.size();
-
- for (int i = 0; i < results.length; i++) {
- Result result = results[i];
- NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
-
- if (f == null) { // no result for this key
- resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
- continue;
- }
-
- // actual key with <timestamp, value>
- NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
- resultMap.put(keys.get(i), (r == null)
- ? EntryList.EMPTY_LIST
- : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
- }
-
- return resultMap;
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- }
- }
-
- private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
- storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
- }
-
- private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
- return executeKeySliceQuery(null, null, filters, columnSlice);
- }
-
- private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
- @Nullable byte[] endKey,
- FilterList filters,
- @Nullable SliceQuery columnSlice) throws BackendException {
- Scan scan = new Scan().addFamily(columnFamilyBytes);
-
- try {
- scan.setTimeRange(0, Long.MAX_VALUE);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
-
- if (startKey != null)
- scan.setStartRow(startKey);
-
- if (endKey != null)
- scan.setStopRow(endKey);
-
- if (columnSlice != null) {
- filters.addFilter(getFilter(columnSlice));
- }
-
- TableMask table = null;
-
- logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey));
- try {
- table = cnx.getTable(tableName);
- return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
- } catch (IOException e) {
- IOUtils.closeQuietly(table);
- throw new PermanentBackendException(e);
- }
- }
-
- private class RowIterator implements KeyIterator {
- private final Closeable table;
- private final Iterator<Result> rows;
- private final byte[] columnFamilyBytes;
-
- private Result currentRow;
- private boolean isClosed;
-
- public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
- this.table = table;
- this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
- this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
- @Override
- public boolean apply(@Nullable Result result) {
- if (result == null)
- return false;
-
- try {
- StaticBuffer id = StaticArrayBuffer.of(result.getRow());
- id.getLong(0);
- } catch (NumberFormatException e) {
- return false;
- }
-
- return true;
- }
- });
- }
-
- @Override
- public RecordIterator<Entry> getEntries() {
- ensureOpen();
-
- return new RecordIterator<Entry>() {
- private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> currentMap = currentRow.getMap();
- private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentMap == null ? null : currentMap.get(columnFamilyBytes).entrySet().iterator();
-
- @Override
- public boolean hasNext() {
- ensureOpen();
- return kv == null ? false : kv.hasNext();
- }
-
- @Override
- public Entry next() {
- ensureOpen();
- return kv == null ? null : StaticArrayEntry.ofBytes(kv.next(), entryGetter);
- }
-
- @Override
- public void close() {
- isClosed = true;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public boolean hasNext() {
- ensureOpen();
- return rows.hasNext();
- }
-
- @Override
- public StaticBuffer next() {
- ensureOpen();
-
- currentRow = rows.next();
- return StaticArrayBuffer.of(currentRow.getRow());
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(table);
- isClosed = true;
- logger.debug("RowIterator closed table {}", table);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private void ensureOpen() {
- if (isClosed)
- throw new IllegalStateException("Iterator has been closed.");
- }
- }
-
- private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
-
- private final EntryMetaData[] schema;
-
- private HBaseGetter(EntryMetaData[] schema) {
- this.schema = schema;
- }
-
- @Override
- public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
- return element.getKey();
- }
-
- @Override
- public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
- return element.getValue().lastEntry().getValue();
- }
-
- @Override
- public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
- return schema;
- }
-
- @Override
- public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
- switch(meta) {
- case TIMESTAMP:
- return element.getValue().lastEntry().getKey();
- default:
- throw new UnsupportedOperationException("Unsupported meta data: " + meta);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
deleted file mode 100644
index a94a7e4..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
+++ /dev/null
@@ -1,935 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import com.thinkaurelius.titan.diskstorage.Backend;
-import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager;
-import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
-import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators;
-import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import com.thinkaurelius.titan.core.TitanException;
-import com.thinkaurelius.titan.diskstorage.BackendException;
-import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
-import com.thinkaurelius.titan.diskstorage.Entry;
-import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
-import com.thinkaurelius.titan.diskstorage.StaticBuffer;
-import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
-import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
-import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
-import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
-import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
-import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
-import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
-import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
-import com.thinkaurelius.titan.util.system.IOUtils;
-import com.thinkaurelius.titan.util.system.NetworkUtil;
-
-/**
- * Storage Manager for HBase
- *
- * @author Dan LaRocque <da...@hopcount.org>
- */
-@PreInitializeConfigOptions
-public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class);
-
- public static final ConfigNamespace HBASE_NS =
- new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options");
-
- public static final ConfigOption<Boolean> SHORT_CF_NAMES =
- new ConfigOption<Boolean>(HBASE_NS, "short-cf-names",
- "Whether to shorten the names of Titan's column families to one-character mnemonics " +
- "to conserve storage space", ConfigOption.Type.FIXED, true);
-
- public static final String COMPRESSION_DEFAULT = "-DEFAULT-";
-
- public static final ConfigOption<String> COMPRESSION =
- new ConfigOption<String>(HBASE_NS, "compression-algorithm",
- "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " +
- "The compression algorithm must be installed and available on the HBase cluster. Titan cannot install " +
- "and configure new compression algorithms on the HBase cluster by itself.",
- ConfigOption.Type.MASKABLE, "GZ");
-
- public static final ConfigOption<Boolean> SKIP_SCHEMA_CHECK =
- new ConfigOption<Boolean>(HBASE_NS, "skip-schema-check",
- "Assume that Titan's HBase table and column families already exist. " +
- "When this is true, Titan will not check for the existence of its table/CFs, " +
- "nor will it attempt to create them under any circumstances. This is useful " +
- "when running Titan without HBase admin privileges.",
- ConfigOption.Type.MASKABLE, false);
-
- public static final ConfigOption<String> HBASE_TABLE =
- new ConfigOption<String>(HBASE_NS, "table",
- "The name of the table Titan will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) +
- " is false, Titan will automatically create this table if it does not already exist.",
- ConfigOption.Type.LOCAL, "titan");
-
- /**
- * Related bug fixed in 0.98.0, 0.94.7, 0.95.0:
- *
- * https://issues.apache.org/jira/browse/HBASE-8170
- */
- public static final int MIN_REGION_COUNT = 3;
-
- /**
- * The total number of HBase regions to create with Titan's table. This
- * setting only effects table creation; this normally happens just once when
- * Titan connects to an HBase backend for the first time.
- */
- public static final ConfigOption<Integer> REGION_COUNT =
- new ConfigOption<Integer>(HBASE_NS, "region-count",
- "The number of initial regions set when creating Titan's HBase table",
- ConfigOption.Type.MASKABLE, Integer.class, new Predicate<Integer>() {
- @Override
- public boolean apply(Integer input) {
- return null != input && MIN_REGION_COUNT <= input;
- }
- }
- );
-
- /**
- * This setting is used only when {@link #REGION_COUNT} is unset.
- * <p/>
- * If Titan's HBase table does not exist, then it will be created with total
- * region count = (number of servers reported by ClusterStatus) * (this
- * value).
- * <p/>
- * The Apache HBase manual suggests an order-of-magnitude range of potential
- * values for this setting:
- *
- * <ul>
- * <li>
- * <a href="https://hbase.apache.org/book/important_configurations.html#disable.splitting">2.5.2.7. Managed Splitting</a>:
- * <blockquote>
- * What's the optimal number of pre-split regions to create? Mileage will
- * vary depending upon your application. You could start low with 10
- * pre-split regions / server and watch as data grows over time. It's
- * better to err on the side of too little regions and rolling split later.
- * </blockquote>
- * </li>
- * <li>
- * <a href="https://hbase.apache.org/book/regions.arch.html">9.7 Regions</a>:
- * <blockquote>
- * In general, HBase is designed to run with a small (20-200) number of
- * relatively large (5-20Gb) regions per server... Typically you want to
- * keep your region count low on HBase for numerous reasons. Usually
- * right around 100 regions per RegionServer has yielded the best results.
- * </blockquote>
- * </li>
- * </ul>
- *
- * These considerations may differ for other HBase implementations (e.g. MapR).
- */
- public static final ConfigOption<Integer> REGIONS_PER_SERVER =
- new ConfigOption<Integer>(HBASE_NS, "regions-per-server",
- "The number of regions per regionserver to set when creating Titan's HBase table",
- ConfigOption.Type.MASKABLE, Integer.class);
-
- /**
- * If this key is present in either the JVM system properties or the process
- * environment (checked in the listed order, first hit wins), then its value
- * must be the full package and class name of an implementation of
- * {@link HBaseCompat} that has a no-arg public constructor.
- * <p>
- * When this <b>is not</b> set, Titan attempts to automatically detect the
- * HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan
- * then checks the returned version string against a hard-coded list of
- * supported version prefixes and instantiates the associated compat layer
- * if a match is found.
- * <p>
- * When this <b>is</b> set, Titan will not call
- * {@code VersionInfo.getVersion()} or read its hard-coded list of supported
- * version prefixes. Titan will instead attempt to instantiate the class
- * specified (via the no-arg constructor which must exist) and then attempt
- * to cast it to HBaseCompat and use it as such. Titan will assume the
- * supplied implementation is compatible with the runtime HBase version and
- * make no attempt to verify that assumption.
- * <p>
- * Setting this key incorrectly could cause runtime exceptions at best or
- * silent data corruption at worst. This setting is intended for users
- * running exotic HBase implementations that don't support VersionInfo or
- * implementations which return values from {@code VersionInfo.getVersion()}
- * that are inconsistent with Apache's versioning convention. It may also be
- * useful to users who want to run against a new release of HBase that Titan
- * doesn't yet officially support.
- *
- */
- public static final ConfigOption<String> COMPAT_CLASS =
- new ConfigOption<String>(HBASE_NS, "compat-class",
- "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " +
- "When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " +
- "at runtime. Setting this option forces Titan to instead reflectively load and instantiate the specified class.",
- ConfigOption.Type.MASKABLE, String.class);
-
- public static final int PORT_DEFAULT = 9160;
-
- public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI;
-
- public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE =
- new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true);
-
- private static final BiMap<String, String> SHORT_CF_NAME_MAP =
- ImmutableBiMap.<String, String>builder()
- .put(Backend.INDEXSTORE_NAME, "g")
- .put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h")
- .put(Backend.ID_STORE_NAME, "i")
- .put(Backend.EDGESTORE_NAME, "e")
- .put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f")
- .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s")
- .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t")
- .put(Backend.SYSTEM_MGMT_LOG_NAME, "m")
- .put(Backend.SYSTEM_TX_LOG_NAME, "l")
- .build();
-
- private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4);
-
- static {
- // Verify that shortCfNameMap is injective
- // Should be guaranteed by Guava BiMap, but it doesn't hurt to check
- Preconditions.checkArgument(null != SHORT_CF_NAME_MAP);
- Collection<String> shorts = SHORT_CF_NAME_MAP.values();
- Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size());
- }
-
- // Immutable instance fields
- private final String tableName;
- private final String compression;
- private final int regionCount;
- private final int regionsPerServer;
- private final ConnectionMask cnx;
- private final org.apache.hadoop.conf.Configuration hconf;
- private final boolean shortCfNames;
- private final boolean skipSchemaCheck;
- private final String compatClass;
- private final HBaseCompat compat;
-
- private static final ConcurrentHashMap<HBaseStoreManager, Throwable> openManagers =
- new ConcurrentHashMap<HBaseStoreManager, Throwable>();
-
- // Mutable instance state
- private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores;
-
- private LocalLockMediator<StoreTransaction> llm;
-
- public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException {
- super(config, PORT_DEFAULT);
-
- checkConfigDeprecation(config);
-
- this.tableName = config.get(HBASE_TABLE);
- this.compression = config.get(COMPRESSION);
- this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1;
- this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1;
- this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK);
- this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null;
- this.compat = HBaseCompatLoader.getCompat(compatClass);
-
- /*
- * Specifying both region count options is permitted but may be
- * indicative of a misunderstanding, so issue a warning.
- */
- if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) {
- logger.warn("Both {} and {} are set in Titan's configuration, but "
- + "the former takes precedence and the latter will be ignored.",
- REGION_COUNT, REGIONS_PER_SERVER);
- }
-
- /* This static factory calls HBaseConfiguration.addHbaseResources(),
- * which in turn applies the contents of hbase-default.xml and then
- * applies the contents of hbase-site.xml.
- */
- this.hconf = HBaseConfiguration.create();
-
- // Copy a subset of our commons config into a Hadoop config
- int keysLoaded=0;
- Map<String,Object> configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE);
- for (Map.Entry<String,Object> entry : configSub.entrySet()) {
- logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue());
- if (entry.getValue()==null) continue;
- hconf.set(entry.getKey(), entry.getValue().toString());
- keysLoaded++;
- }
-
- // Special case for STORAGE_HOSTS
- if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) {
- String zkQuorumKey = "hbase.zookeeper.quorum";
- String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS));
- hconf.set(zkQuorumKey, csHostList);
- logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList);
- }
-
- logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded);
-
- this.shortCfNames = config.get(SHORT_CF_NAMES);
-
- try {
- //this.cnx = HConnectionManager.createConnection(hconf);
- this.cnx = compat.createConnection(hconf);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
-
- if (logger.isTraceEnabled()) {
- openManagers.put(this, new Throwable("Manager Opened"));
- dumpOpenManagers();
- }
-
- logger.debug("Dumping HBase config key=value pairs");
- for (Map.Entry<String, String> entry : hconf) {
- logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue());
- }
- logger.debug("End of HBase config key=value pairs");
-
- openStores = new ConcurrentHashMap<String, HBaseKeyColumnValueStore>();
- }
-
- @Override
- public Deployment getDeployment() {
- return Deployment.REMOTE;
-
- /* If just one of the regions for titan table is in the localhost,
- * this method returns Deployment.LOCAL - which does not sound right.
- *
- List<KeyRange> local;
- try {
- local = getLocalKeyPartition();
- return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE;
- } catch (BackendException e) {
- // propagating StorageException might be a better approach
- throw new RuntimeException(e);
- }
- *
- */
- }
-
- @Override
- public String toString() {
- return "hbase[" + tableName + "@" + super.toString() + "]";
- }
-
- public void dumpOpenManagers() {
- int estimatedSize = openManagers.size();
- logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize);
- for (HBaseStoreManager m : openManagers.keySet()) {
- logger.trace("Manager {} opened at:", m, openManagers.get(m));
- }
- logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize);
- }
-
- @Override
- public void close() {
- openStores.clear();
- if (logger.isTraceEnabled())
- openManagers.remove(this);
- IOUtils.closeQuietly(cnx);
- }
-
- @Override
- public StoreFeatures getFeatures() {
-
- Configuration c = GraphDatabaseConfiguration.buildConfiguration();
-
- StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder()
- .orderedScan(true).unorderedScan(true).batchMutation(true)
- .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true)
- .timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS)
- .locking(true)
- .keyConsistent(c);
-
- try {
- fb.localKeyPartition(getDeployment() == Deployment.LOCAL);
- } catch (Exception e) {
- logger.warn("Unexpected exception during getDeployment()", e);
- }
-
- return fb.build();
- }
-
- @Override
- public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
- logger.debug("Enter mutateMany");
- final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
- // In case of an addition and deletion with identical timestamps, the
- // deletion tombstone wins.
- // http://hbase.apache.org/book/versions.html#d244e4250
- Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey =
- convertToCommands(
- mutations,
- commitTime.getAdditionTime(times.getUnit()),
- commitTime.getDeletionTime(times.getUnit()));
-
- List<Row> batch = new ArrayList<Row>(commandsPerKey.size()); // actual batch operation
-
- // convert sorted commands into representation required for 'batch' operation
- for (Pair<Put, Delete> commands : commandsPerKey.values()) {
- if (commands.getFirst() != null)
- batch.add(commands.getFirst());
-
- if (commands.getSecond() != null)
- batch.add(commands.getSecond());
- }
-
- try {
- TableMask table = null;
-
- try {
- table = cnx.getTable(tableName);
- logger.debug("mutateMany : batch mutate started size {} ", batch.size());
- table.batch(batch, new Object[batch.size()]);
- logger.debug("mutateMany : batch mutate finished {} ", batch.size());
- } finally {
- IOUtils.closeQuietly(table);
- }
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- } catch (InterruptedException e) {
- throw new TemporaryBackendException(e);
- }
-
- sleepAfterWrite(txh, commitTime);
- }
-
- @Override
- public KeyColumnValueStore openDatabase(String longName) throws BackendException {
-
- return openDatabase(longName, -1);
- }
-
- @Override
- public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException {
-
- HBaseKeyColumnValueStore store = openStores.get(longName);
-
- if (store == null) {
- final String cfName = shortCfNames ? shortenCfName(longName) : longName;
-
- final String llmPrefix = getName();
- llm = LocalLockMediators.INSTANCE.<StoreTransaction>get(llmPrefix, times);
- HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm);
-
- store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread
-
- if (store == null) {
- if (!skipSchemaCheck)
- ensureColumnFamilyExists(tableName, cfName, ttlInSeconds);
-
- store = newStore;
- }
- logger.info("Loaded 1.x Hbase Client Store Manager");
- }
-
- return store;
- }
-
-
- @Override
- public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
- return new HBaseTransaction(config, llm);
- }
-
- @Override
- public String getName() {
- return tableName;
- }
-
- /**
- * Deletes the specified table with all its columns.
- * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss.
- */
- @Override
- public void clearStorage() throws BackendException {
- try (AdminMask adm = getAdminInterface()) {
- adm.clearTable(tableName, times.getTime().getNativeTimestamp());
- } catch (IOException e)
- {
- throw new TemporaryBackendException(e);
- }
- }
-
- @Override
- public List<KeyRange> getLocalKeyPartition() throws BackendException {
-
- List<KeyRange> result = new LinkedList<KeyRange>();
-
- TableMask table = null;
- try {
- ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0);
-
- table = cnx.getTable(tableName);
-
- HTable hTable = (HTable)table.getTableObject();
-
- Map<KeyRange, ServerName> normed =
- normalizeKeyBounds(hTable.getRegionLocations());
-
- for (Map.Entry<KeyRange, ServerName> e : normed.entrySet()) {
- if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) {
- result.add(e.getKey());
- logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue());
- } else {
- logger.debug("Discarding remote {}", e.getValue());
- }
- }
- } catch (MasterNotRunningException e) {
- logger.warn("Unexpected MasterNotRunningException", e);
- } catch (ZooKeeperConnectionException e) {
- logger.warn("Unexpected ZooKeeperConnectionException", e);
- } catch (IOException e) {
- logger.warn("Unexpected IOException", e);
- } finally {
- IOUtils.closeQuietly(table);
- }
- return result;
- }
-
- /**
- * Given a map produced by {@link HTable#getRegionLocations()}, transform
- * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the
- * region's start and end key bounds using Titan-partitioning-friendly
- * conventions (start inclusive, end exclusive, zero bytes appended where
- * necessary to make all keys at least 4 bytes long).
- * <p/>
- * This method iterates over the entries in its map parameter and performs
- * the following conditional conversions on its keys. "Require" below means
- * either a {@link Preconditions} invocation or an assertion. HRegionInfo
- * sometimes returns start and end keys of zero length; this method replaces
- * zero length keys with null before doing any of the checks described
- * below. The parameter map and the values it contains are only read and
- * never modified.
- *
- * <ul>
- * <li>If an entry's HRegionInfo has null start and end keys, then first
- * require that the parameter map is a singleton, and then return a
- * single-entry map whose {@code KeyRange} has start and end buffers that
- * are both four bytes of zeros.</li>
- * <li>If the entry has a null end key (but non-null start key), put an
- * equivalent entry in the result map with a start key identical to the
- * input, except that zeros are appended to values less than 4 bytes long,
- * and an end key that is four bytes of zeros.
- * <li>If the entry has a null start key (but non-null end key), put an
- * equivalent entry in the result map where the start key is four bytes of
- * zeros, and the end key has zeros appended, if necessary, to make it at
- * least 4 bytes long, after which one is added to the padded value in
- * unsigned 32-bit arithmetic with overflow allowed.</li>
- * <li>Any entry which matches none of the above criteria results in an
- * equivalent entry in the returned map, except that zeros are appended to
- * both keys to make each at least 4 bytes long, and the end key is then
- * incremented as described in the last bullet point.</li>
- * </ul>
- *
- * After iterating over the parameter map, this method checks that it either
- * saw no entries with null keys, one entry with a null start key and a
- * different entry with a null end key, or one entry with both start and end
- * keys null. If any null keys are observed besides these three cases, the
- * method will die with a precondition failure.
- *
- * @param raw
- * A map of HRegionInfo and ServerName from HBase
- * @return Titan-friendly expression of each region's rowkey boundaries
- */
- private Map<KeyRange, ServerName> normalizeKeyBounds(NavigableMap<HRegionInfo, ServerName> raw) {
-
- Map.Entry<HRegionInfo, ServerName> nullStart = null;
- Map.Entry<HRegionInfo, ServerName> nullEnd = null;
-
- ImmutableMap.Builder<KeyRange, ServerName> b = ImmutableMap.builder();
-
- for (Map.Entry<HRegionInfo, ServerName> e : raw.entrySet()) {
- HRegionInfo regionInfo = e.getKey();
- byte startKey[] = regionInfo.getStartKey();
- byte endKey[] = regionInfo.getEndKey();
-
- if (0 == startKey.length) {
- startKey = null;
- logger.trace("Converted zero-length HBase startKey byte array to null");
- }
-
- if (0 == endKey.length) {
- endKey = null;
- logger.trace("Converted zero-length HBase endKey byte array to null");
- }
-
- if (null == startKey && null == endKey) {
- Preconditions.checkState(1 == raw.size());
- logger.debug("HBase table {} has a single region {}", tableName, regionInfo);
- // Choose arbitrary shared value = startKey = endKey
- return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build();
- } else if (null == startKey) {
- logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo);
- Preconditions.checkState(null == nullStart);
- nullStart = e;
- // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive
- StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
- // Replace null start key with zeroes
- b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue());
- } else if (null == endKey) {
- logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo);
- Preconditions.checkState(null == nullEnd);
- nullEnd = e;
- // Replace null end key with zeroes
- b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue());
- } else {
- Preconditions.checkState(null != startKey);
- Preconditions.checkState(null != endKey);
-
- // Convert HBase's inclusive end keys into exclusive Titan end keys
- StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey));
- StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
-
- KeyRange kr = new KeyRange(startBuf, endBuf);
- b.put(kr, e.getValue());
- logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo);
- }
- }
-
- // Require either no null key bounds or a pair of them
- Preconditions.checkState(!(null == nullStart ^ null == nullEnd));
-
- // Check that every key in the result is at least 4 bytes long
- Map<KeyRange, ServerName> result = b.build();
- for (KeyRange kr : result.keySet()) {
- Preconditions.checkState(4 <= kr.getStart().length());
- Preconditions.checkState(4 <= kr.getEnd().length());
- }
-
- return result;
- }
-
- /**
- * If the parameter is shorter than 4 bytes, then create and return a new 4
- * byte array with the input array's bytes followed by zero bytes. Otherwise
- * return the parameter.
- *
- * @param dataToPad non-null but possibly zero-length byte array
- * @return either the parameter or a new array
- */
- private final byte[] zeroExtend(byte[] dataToPad) {
- assert null != dataToPad;
-
- final int targetLength = 4;
-
- if (targetLength <= dataToPad.length)
- return dataToPad;
-
- byte padded[] = new byte[targetLength];
-
- for (int i = 0; i < dataToPad.length; i++)
- padded[i] = dataToPad[i];
-
- for (int i = dataToPad.length; i < padded.length; i++)
- padded[i] = (byte)0;
-
- return padded;
- }
-
- public static String shortenCfName(String longName) throws PermanentBackendException {
- final String s;
- if (SHORT_CF_NAME_MAP.containsKey(longName)) {
- s = SHORT_CF_NAME_MAP.get(longName);
- Preconditions.checkNotNull(s);
- logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s);
- } else {
- if (SHORT_CF_NAME_MAP.containsValue(longName)) {
- String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true";
- String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName());
- throw new PermanentBackendException(msg);
- }
- s = longName;
- logger.debug("Kept default CF name \"{}\" because it has no associated short form", s);
- }
- return s;
- }
-
- private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException {
- AdminMask adm = null;
-
- HTableDescriptor desc;
-
- try { // Create our table, if necessary
- adm = getAdminInterface();
- /*
- * Some HBase versions/impls respond badly to attempts to create a
- * table without at least one CF. See #661. Creating a CF along with
- * the table avoids HBase carping.
- */
- if (adm.tableExists(tableName)) {
- desc = adm.getTableDescriptor(tableName);
- } else {
- desc = createTable(tableName, initialCFName, ttlInSeconds, adm);
- }
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- } finally {
- IOUtils.closeQuietly(adm);
- }
-
- return desc;
- }
-
- private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException {
- HTableDescriptor desc = compat.newTableDescriptor(tableName);
-
- HColumnDescriptor cdesc = new HColumnDescriptor(cfName);
- setCFOptions(cdesc, ttlInSeconds);
-
- compat.addColumnFamilyToTableDescriptor(desc, cdesc);
-
- int count; // total regions to create
- String src;
-
- if (MIN_REGION_COUNT <= (count = regionCount)) {
- src = "region count configuration";
- } else if (0 < regionsPerServer &&
- MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) {
- src = "ClusterStatus server count";
- } else {
- count = -1;
- src = "default";
- }
-
- if (MIN_REGION_COUNT < count) {
- adm.createTable(desc, getStartKey(count), getEndKey(count), count);
- logger.debug("Created table {} with region count {} from {}", tableName, count, src);
- } else {
- adm.createTable(desc);
- logger.debug("Created table {} with default start key, end key, and region count", tableName);
- }
-
- return desc;
- }
-
- /**
- * This method generates the second argument to
- * {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}.
- * <p/>
- * From the {@code createTable} javadoc:
- * "The start key specified will become the end key of the first region of
- * the table, and the end key specified will become the start key of the
- * last region of the table (the first region has a null start key and
- * the last region has a null end key)"
- * <p/>
- * To summarize, the {@code createTable} argument called "startKey" is
- * actually the end key of the first region.
- */
- private byte[] getStartKey(int regionCount) {
- ByteBuffer regionWidth = ByteBuffer.allocate(4);
- regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip();
- return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
- }
-
- /**
- * Companion to {@link #getStartKey(int)}. See its javadoc for details.
- */
- private byte[] getEndKey(int regionCount) {
- ByteBuffer regionWidth = ByteBuffer.allocate(4);
- regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip();
- return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
- }
-
- private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException {
- AdminMask adm = null;
- try {
- adm = getAdminInterface();
- HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds);
-
- Preconditions.checkNotNull(desc);
-
- HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes());
-
- // Create our column family, if necessary
- if (cf == null) {
- try {
- if (!adm.isTableDisabled(tableName)) {
- adm.disableTable(tableName);
- }
- } catch (TableNotEnabledException e) {
- logger.debug("Table {} already disabled", tableName);
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- }
-
- try {
- HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily);
-
- setCFOptions(cdesc, ttlInSeconds);
-
- adm.addColumn(tableName, cdesc);
-
- logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily);
-
- adm.enableTable(tableName);
- } catch (TableNotFoundException ee) {
- logger.error("TableNotFoundException", ee);
- throw new PermanentBackendException(ee);
- } catch (org.apache.hadoop.hbase.TableExistsException ee) {
- logger.debug("Swallowing exception {}", ee);
- } catch (IOException ee) {
- throw new TemporaryBackendException(ee);
- }
- }
- } finally {
- IOUtils.closeQuietly(adm);
- }
- }
-
- private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) {
- if (null != compression && !compression.equals(COMPRESSION_DEFAULT))
- compat.setCompression(cdesc, compression);
-
- if (ttlInSeconds > 0)
- cdesc.setTimeToLive(ttlInSeconds);
-
- cdesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- }
-
- /**
- * Convert Titan internal Mutation representation into HBase native commands.
- *
- * @param mutations Mutations to convert into HBase commands.
- * @param putTimestamp The timestamp to use for Put commands.
- * @param delTimestamp The timestamp to use for Delete commands.
- * @return Commands sorted by key converted from Titan internal representation.
- * @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException
- */
- private Map<StaticBuffer, Pair<Put, Delete>> convertToCommands(Map<String, Map<StaticBuffer, KCVMutation>> mutations,
- final long putTimestamp,
- final long delTimestamp) throws PermanentBackendException {
- Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = new HashMap<StaticBuffer, Pair<Put, Delete>>();
-
- for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> entry : mutations.entrySet()) {
-
- String cfString = getCfNameForStoreName(entry.getKey());
- byte[] cfName = cfString.getBytes();
-
- for (Map.Entry<StaticBuffer, KCVMutation> m : entry.getValue().entrySet()) {
- byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY);
- KCVMutation mutation = m.getValue();
-
- Pair<Put, Delete> commands = commandsPerKey.get(m.getKey());
-
- if (commands == null) {
- commands = new Pair<Put, Delete>();
- commandsPerKey.put(m.getKey(), commands);
- }
-
- if (mutation.hasDeletions()) {
- if (commands.getSecond() == null) {
- Delete d = new Delete(key);
- compat.setTimestamp(d, delTimestamp);
- commands.setSecond(d);
- }
-
- for (StaticBuffer b : mutation.getDeletions()) {
- commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp);
- }
- }
-
- if (mutation.hasAdditions()) {
- if (commands.getFirst() == null) {
- Put p = new Put(key, putTimestamp);
- commands.setFirst(p);
- }
-
- for (Entry e : mutation.getAdditions()) {
- commands.getFirst().add(cfName,
- e.getColumnAs(StaticBuffer.ARRAY_FACTORY),
- putTimestamp,
- e.getValueAs(StaticBuffer.ARRAY_FACTORY));
- }
- }
- }
- }
-
- return commandsPerKey;
- }
-
- private String getCfNameForStoreName(String storeName) throws PermanentBackendException {
- return shortCfNames ? shortenCfName(storeName) : storeName;
- }
-
- private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) {
- if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) {
- logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.",
- ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE));
- }
- }
-
- private AdminMask getAdminInterface() {
- try {
- return cnx.getAdmin();
- } catch (IOException e) {
- throw new TitanException(e);
- }
- }
-
- /**
- * Similar to {@link Function}, except that the {@code apply} method is allowed
- * to throw {@link BackendException}.
- */
- private static interface BackendFunction<F, T> {
-
- T apply(F input) throws BackendException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
deleted file mode 100644
index e13593f..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import com.thinkaurelius.titan.diskstorage.BackendException;
-import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
-import com.thinkaurelius.titan.diskstorage.StaticBuffer;
-import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
-import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
-import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-/**
- * This class overrides and adds nothing compared with
- * {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific
- * to HBase, which lets us check for user errors like passing a Cassandra
- * transaction into a HBase method.
- *
- * @author Dan LaRocque <da...@hopcount.org>
- */
-public class HBaseTransaction extends AbstractStoreTransaction {
-
- private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class);
-
- LocalLockMediator<StoreTransaction> llm;
-
- Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>();
-
- public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) {
- super(config);
- this.llm = llm;
- }
-
- @Override
- public synchronized void rollback() throws BackendException {
- super.rollback();
- log.debug("Rolled back transaction");
- deleteAllLocks();
- }
-
- @Override
- public synchronized void commit() throws BackendException {
- super.commit();
- log.debug("Committed transaction");
- deleteAllLocks();
- }
-
- public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) {
- keyColumnLocks.add(lockID);
- }
-
- private void deleteAllLocks() {
- for(KeyColumn kc : keyColumnLocks) {
- log.debug("Removed lock {} ", kc);
- llm.unlock(kc, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
deleted file mode 100644
index 8660644..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-
-public class HConnection0_98 implements ConnectionMask
-{
-
- private final HConnection cnx;
-
- public HConnection0_98(HConnection cnx)
- {
- this.cnx = cnx;
- }
-
- @Override
- public TableMask getTable(String name) throws IOException
- {
- return new HTable0_98(cnx.getTable(name));
- }
-
- @Override
- public AdminMask getAdmin() throws IOException
- {
- return new HBaseAdmin0_98(new HBaseAdmin(cnx));
- }
-
- @Override
- public void close() throws IOException
- {
- cnx.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
deleted file mode 100644
index 91e5026..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-public class HConnection1_0 implements ConnectionMask
-{
-
- private final Connection cnx;
-
- public HConnection1_0(Connection cnx)
- {
- this.cnx = cnx;
- }
-
- @Override
- public TableMask getTable(String name) throws IOException
- {
- return new HTable1_0(cnx.getTable(TableName.valueOf(name)));
- }
-
- @Override
- public AdminMask getAdmin() throws IOException
- {
- return new HBaseAdmin1_0(new HBaseAdmin(cnx));
- }
-
- @Override
- public void close() throws IOException
- {
- cnx.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
deleted file mode 100644
index b11532a..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.Scan;
-
-public class HTable0_98 implements TableMask
-{
- private final HTableInterface table;
-
- public HTable0_98(HTableInterface table)
- {
- this.table = table;
- }
-
- @Override
- public ResultScanner getScanner(Scan filter) throws IOException
- {
- return table.getScanner(filter);
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException
- {
- return table.get(gets);
- }
-
- @Override
- public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
- {
- table.batch(writes, results);
- table.flushCommits();
- }
-
- @Override
- public void close() throws IOException
- {
- table.close();
- }
-
- @Override
- public Object getTableObject() {
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
deleted file mode 100644
index 5c90617..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-
-public class HTable1_0 implements TableMask
-{
- private final Table table;
-
- public HTable1_0(Table table)
- {
- this.table = table;
- }
-
- @Override
- public ResultScanner getScanner(Scan filter) throws IOException
- {
- return table.getScanner(filter);
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException
- {
- return table.get(gets);
- }
-
- @Override
- public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
- {
- table.batch(writes, results);
- /* table.flushCommits(); not needed anymore */
- }
-
- @Override
- public void close() throws IOException
- {
- table.close();
- }
-
- @Override
- public Object getTableObject() {
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
deleted file mode 100644
index 54f8743..0000000
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course
- * of development from 0.94 to 1.0 and beyond.
- */
-public interface TableMask extends Closeable
-{
-
- ResultScanner getScanner(Scan filter) throws IOException;
-
- Result[] get(List<Get> gets) throws IOException;
-
- void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException;
-
- Object getTableObject();
-}