You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2015/12/03 06:46:16 UTC
[5/5] incubator-atlas git commit: ATLAS-352 Improve write performance
on type and entity creation with Hbase(sumasai)
ATLAS-352 Improve write performance on type and entity creation with Hbase(sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/919120f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/919120f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/919120f6
Branch: refs/heads/master
Commit: 919120f65f551f80a02d2782cb4e01f87202f7a5
Parents: 91ad021
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Thu Dec 3 11:07:25 2015 +0530
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Thu Dec 3 11:07:25 2015 +0530
----------------------------------------------------------------------
distro/pom.xml | 10 +
distro/src/bin/atlas_start.py | 1 +
.../src/main/assemblies/standalone-package.xml | 2 +-
distro/src/test/python/scripts/TestMetadata.py | 10 +-
pom.xml | 7 +
release-log.txt | 1 +
repository/pom.xml | 51 +-
.../titan/diskstorage/hbase/AdminMask.java | 62 --
.../titan/diskstorage/hbase/ConnectionMask.java | 30 -
.../titan/diskstorage/hbase/HBaseAdmin0_98.java | 152 ---
.../titan/diskstorage/hbase/HBaseAdmin1_0.java | 135 ---
.../titan/diskstorage/hbase/HBaseCompat.java | 61 --
.../diskstorage/hbase/HBaseCompat0_98.java | 58 --
.../titan/diskstorage/hbase/HBaseCompat1_0.java | 59 --
.../titan/diskstorage/hbase/HBaseCompat1_1.java | 58 --
.../diskstorage/hbase/HBaseCompatLoader.java | 80 --
.../hbase/HBaseKeyColumnValueStore.java | 368 -------
.../diskstorage/hbase/HBaseStoreManager.java | 925 ------------------
.../diskstorage/hbase/HBaseTransaction.java | 33 -
.../diskstorage/hbase/HConnection0_98.java | 49 -
.../titan/diskstorage/hbase/HConnection1_0.java | 50 -
.../titan/diskstorage/hbase/HTable0_98.java | 60 --
.../titan/diskstorage/hbase/HTable1_0.java | 61 --
.../titan/diskstorage/hbase/TableMask.java | 40 -
.../titan/diskstorage/solr/Solr5Index.java | 972 ------------------
.../apache/atlas/RepositoryMetadataModule.java | 1 +
.../graph/GraphRepoMapperScaleTest.java | 60 +-
titan/pom.xml | 105 ++
.../titan/diskstorage/hbase/AdminMask.java | 62 ++
.../titan/diskstorage/hbase/ConnectionMask.java | 30 +
.../titan/diskstorage/hbase/HBaseAdmin0_98.java | 152 +++
.../titan/diskstorage/hbase/HBaseAdmin1_0.java | 135 +++
.../titan/diskstorage/hbase/HBaseCompat.java | 60 ++
.../diskstorage/hbase/HBaseCompat0_98.java | 58 ++
.../titan/diskstorage/hbase/HBaseCompat1_0.java | 58 ++
.../titan/diskstorage/hbase/HBaseCompat1_1.java | 58 ++
.../diskstorage/hbase/HBaseCompatLoader.java | 80 ++
.../hbase/HBaseKeyColumnValueStore.java | 397 ++++++++
.../diskstorage/hbase/HBaseStoreManager.java | 926 ++++++++++++++++++
.../diskstorage/hbase/HBaseTransaction.java | 75 ++
.../diskstorage/hbase/HConnection0_98.java | 49 +
.../titan/diskstorage/hbase/HConnection1_0.java | 50 +
.../titan/diskstorage/hbase/HTable0_98.java | 60 ++
.../titan/diskstorage/hbase/HTable1_0.java | 60 ++
.../titan/diskstorage/hbase/TableMask.java | 40 +
.../diskstorage/locking/LocalLockMediator.java | 345 +++++++
.../titan/diskstorage/solr/Solr5Index.java | 973 +++++++++++++++++++
.../locking/LocalLockMediatorTest.java | 60 ++
48 files changed, 3885 insertions(+), 3344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/pom.xml
----------------------------------------------------------------------
diff --git a/distro/pom.xml b/distro/pom.xml
index 1496eec..d88805b 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -89,6 +89,16 @@
</profiles>
<build>
+ <outputDirectory>target/bin</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/bin</directory>
+ <filtering>true</filtering>
+ <includes>
+ <include>**/*.py</include>
+ </includes>
+ </resource>
+ </resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/src/bin/atlas_start.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_start.py b/distro/src/bin/atlas_start.py
index 08e9bdc..fc5860e 100755
--- a/distro/src/bin/atlas_start.py
+++ b/distro/src/bin/atlas_start.py
@@ -58,6 +58,7 @@ def main():
p = os.pathsep
metadata_classpath = confdir + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \
+ + os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "atlas-titan-${project.version}.jar" ) + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" ) + p \
+ os.path.join(metadata_home, "libext", "*")
if os.path.exists(hbase_conf_dir):
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml
index 4530773..56fe736 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -48,7 +48,7 @@
</fileSet>
<fileSet>
- <directory>src/bin</directory>
+ <directory>target/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/src/test/python/scripts/TestMetadata.py
----------------------------------------------------------------------
diff --git a/distro/src/test/python/scripts/TestMetadata.py b/distro/src/test/python/scripts/TestMetadata.py
index 74a8b84..349059f 100644
--- a/distro/src/test/python/scripts/TestMetadata.py
+++ b/distro/src/test/python/scripts/TestMetadata.py
@@ -56,20 +56,16 @@ class TestMetadata(unittest.TestCase):
java_mock.assert_called_with(
'org.apache.atlas.Atlas',
['-app', 'metadata_home\\server\\webapp\\atlas'],
- 'metadata_home\\conf;metadata_home\\server\\webapp\\atlas\\WEB-INF\\classes;metadata_home\\server\\webapp\\atlas\\WEB-INF\\lib\\*;metadata_home\\libext\\*;metadata_home\\hbase\\conf',
+ 'metadata_home\\conf;metadata_home\\server\\webapp\\atlas\\WEB-INF\\classes;metadata_home\\server\\webapp\\atlas\\WEB-INF\\lib\\atlas-titan-${project.version}.jar;metadata_home\\server\\webapp\\atlas\\WEB-INF\\lib\\*;metadata_home\\libext\\*;metadata_home\\hbase\\conf',
['-Datlas.log.dir=metadata_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home\\logs')
-
-
-
else:
java_mock.assert_called_with(
'org.apache.atlas.Atlas',
['-app', 'metadata_home/server/webapp/atlas'],
- 'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib/*:metadata_home/libext/*:metadata_home/hbase/conf',
+ 'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib/atlas-titan-${project.version}.jar:metadata_home/server/webapp/atlas/WEB-INF/lib/*:metadata_home/libext/*:metadata_home/hbase/conf',
['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
-
-
+
pass
def test_jar_java_lookups_fail(self):
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c09e640..ede8c53 100755
--- a/pom.xml
+++ b/pom.xml
@@ -409,6 +409,7 @@
<module>typesystem</module>
<module>notification</module>
<module>client</module>
+ <module>titan</module>
<module>repository</module>
<module>dashboard</module>
<module>webapp</module>
@@ -925,6 +926,12 @@
<dependency>
<groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-titan</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
<artifactId>atlas-repository</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6d4873b..1c71dd1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
+ATLAS-352 Improve write performance on type and entity creation with Hbase (sumasai)
ATLAS-350 Document jaas config details for atlas (tbeerbower via shwethags)
ATLAS-344 Document HBase permissions for secure cluster (tbeerbower via shwethags)
ATLAS-335 Kerberized cluster: Atlas fails to come up with hbase as backend (sumasai via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index 28107e0..6e1baee 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -50,6 +50,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-titan</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
@@ -85,52 +90,6 @@
</dependency>
<dependency>
- <groupId>com.thinkaurelius.titan</groupId>
- <artifactId>titan-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.thinkaurelius.titan</groupId>
- <artifactId>titan-es</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.vividsolutions</groupId>
- <artifactId>jts</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.solr</groupId>
- <artifactId>solr-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.solr</groupId>
- <artifactId>solr-solrj</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.thinkaurelius.titan</groupId>
- <artifactId>titan-berkeleyje</artifactId>
- </dependency>
-
- <!-- Commenting out since titan-hbase classes are shaded for 1.x support -->
- <!--<dependency>-->
- <!--<groupId>com.thinkaurelius.titan</groupId>-->
- <!--<artifactId>titan-hbase</artifactId>-->
- <!--</dependency>-->
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.thinkaurelius.titan</groupId>
- <artifactId>titan-lucene</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.tinkerpop.gremlin</groupId>
<artifactId>gremlin-java</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
deleted file mode 100644
index e255f1b..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-/**
- * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
- * of development from 0.94 to 1.0 and beyond.
- */
-public interface AdminMask extends Closeable
-{
-
- void clearTable(String tableName, long timestamp) throws IOException;
-
- HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
-
- boolean tableExists(String tableName) throws IOException;
-
- void createTable(HTableDescriptor desc) throws IOException;
-
- void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
-
- /**
- * Estimate the number of regionservers in the HBase cluster.
- *
- * This is usually implemented by calling
- * {@link HBaseAdmin#getClusterStatus()} and then
- * {@link ClusterStatus#getServers()} and finally {@code size()} on the
- * returned server list.
- *
- * @return the number of servers in the cluster or -1 if it could not be determined
- */
- int getEstimatedRegionServerCount();
-
- void disableTable(String tableName) throws IOException;
-
- void enableTable(String tableName) throws IOException;
-
- boolean isTableDisabled(String tableName) throws IOException;
-
- void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
deleted file mode 100644
index feb578b..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
- * of development from 0.94 to 1.0 and beyond.
- */
-public interface ConnectionMask extends Closeable
-{
-
- TableMask getTable(String name) throws IOException;
-
- AdminMask getAdmin() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
deleted file mode 100644
index 0cd4795..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.thinkaurelius.titan.util.system.IOUtils;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-
-public class HBaseAdmin0_98 implements AdminMask
-{
-
- private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
-
- private final HBaseAdmin adm;
-
- public HBaseAdmin0_98(HBaseAdmin adm)
- {
- this.adm = adm;
- }
-
- @Override
- public void clearTable(String tableName, long timestamp) throws IOException
- {
- if (!adm.tableExists(tableName)) {
- log.debug("clearStorage() called before table {} was created, skipping.", tableName);
- return;
- }
-
- // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
- // disabling and deleting tables.
- HTable table = null;
-
- try {
- table = new HTable(adm.getConfiguration(), tableName);
-
- Scan scan = new Scan();
- scan.setBatch(100);
- scan.setCacheBlocks(false);
- scan.setCaching(2000);
- scan.setTimeRange(0, Long.MAX_VALUE);
- scan.setMaxVersions(1);
-
- ResultScanner scanner = null;
-
- try {
- scanner = table.getScanner(scan);
-
- for (Result res : scanner) {
- Delete d = new Delete(res.getRow());
-
- d.setTimestamp(timestamp);
- table.delete(d);
- }
- } finally {
- IOUtils.closeQuietly(scanner);
- }
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
- {
- return adm.getTableDescriptor(tableName.getBytes());
- }
-
- @Override
- public boolean tableExists(String tableName) throws IOException
- {
- return adm.tableExists(tableName);
- }
-
- @Override
- public void createTable(HTableDescriptor desc) throws IOException
- {
- adm.createTable(desc);
- }
-
- @Override
- public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
- {
- adm.createTable(desc, startKey, endKey, numRegions);
- }
-
- @Override
- public int getEstimatedRegionServerCount()
- {
- int serverCount = -1;
- try {
- serverCount = adm.getClusterStatus().getServers().size();
- log.debug("Read {} servers from HBase ClusterStatus", serverCount);
- } catch (IOException e) {
- log.debug("Unable to retrieve HBase cluster status", e);
- }
- return serverCount;
- }
-
- @Override
- public void disableTable(String tableName) throws IOException
- {
- adm.disableTable(tableName);
- }
-
- @Override
- public void enableTable(String tableName) throws IOException
- {
- adm.enableTable(tableName);
- }
-
- @Override
- public boolean isTableDisabled(String tableName) throws IOException
- {
- return adm.isTableDisabled(tableName);
- }
-
- @Override
- public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
- {
- adm.addColumn(tableName, columnDescriptor);
- }
-
- @Override
- public void close() throws IOException
- {
- adm.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
deleted file mode 100644
index 7e8f72d..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-public class HBaseAdmin1_0 implements AdminMask
-{
-
- private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
-
- private final Admin adm;
-
- public HBaseAdmin1_0(HBaseAdmin adm)
- {
- this.adm = adm;
- }
- @Override
- public void clearTable(String tableString, long timestamp) throws IOException
- {
- TableName tableName = TableName.valueOf(tableString);
-
- if (!adm.tableExists(tableName)) {
- log.debug("Attempted to clear table {} before it exists (noop)", tableString);
- return;
- }
-
- if (!adm.isTableDisabled(tableName))
- adm.disableTable(tableName);
-
- if (!adm.isTableDisabled(tableName))
- throw new RuntimeException("Unable to disable table " + tableName);
-
- // This API call appears to both truncate and reenable the table.
- log.info("Truncating table {}", tableName);
- adm.truncateTable(tableName, true /* preserve splits */);
-
- try {
- adm.enableTable(tableName);
- } catch (TableNotDisabledException e) {
- // This triggers seemingly every time in testing with 1.0.2.
- log.debug("Table automatically reenabled by truncation: {}", tableName, e);
- }
- }
-
- @Override
- public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
- {
- return adm.getTableDescriptor(TableName.valueOf(tableString));
- }
-
- @Override
- public boolean tableExists(String tableString) throws IOException
- {
- return adm.tableExists(TableName.valueOf(tableString));
- }
-
- @Override
- public void createTable(HTableDescriptor desc) throws IOException
- {
- adm.createTable(desc);
- }
-
- @Override
- public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
- {
- adm.createTable(desc, startKey, endKey, numRegions);
- }
-
- @Override
- public int getEstimatedRegionServerCount()
- {
- int serverCount = -1;
- try {
- serverCount = adm.getClusterStatus().getServers().size();
- log.debug("Read {} servers from HBase ClusterStatus", serverCount);
- } catch (IOException e) {
- log.debug("Unable to retrieve HBase cluster status", e);
- }
- return serverCount;
- }
-
- @Override
- public void disableTable(String tableString) throws IOException
- {
- adm.disableTable(TableName.valueOf(tableString));
- }
-
- @Override
- public void enableTable(String tableString) throws IOException
- {
- adm.enableTable(TableName.valueOf(tableString));
- }
-
- @Override
- public boolean isTableDisabled(String tableString) throws IOException
- {
- return adm.isTableDisabled(TableName.valueOf(tableString));
- }
-
- @Override
- public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
- {
- adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
- }
-
- @Override
- public void close() throws IOException
- {
- adm.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
deleted file mode 100644
index 3bc6c25..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.Delete;
-
-public interface HBaseCompat {
-
- /**
- * Configure the compression scheme {@code algo} on a column family
- * descriptor {@code cd}. The {@code algo} parameter is a string value
- * corresponding to one of the values of HBase's Compression enum. The
- * Compression enum has moved between packages as HBase has evolved, which
- * is why this method has a String argument in the signature instead of the
- * enum itself.
- *
- * @param cd
- * column family to configure
- * @param algo
- * compression type to use
- */
- public void setCompression(HColumnDescriptor cd, String algo);
-
- /**
- * Create and return a HTableDescriptor instance with the given name. The
- * constructors on this method have remained stable over HBase development
- * so far, but the old HTableDescriptor(String) constructor & byte[] friends
- * are now marked deprecated and may eventually be removed in favor of the
- * HTableDescriptor(TableName) constructor. That constructor (and the
- * TableName type) only exists in newer HBase versions. Hence this method.
- *
- * @param tableName
- * HBase table name
- * @return a new table descriptor instance
- */
- public HTableDescriptor newTableDescriptor(String tableName);
-
- ConnectionMask createConnection(Configuration conf) throws IOException;
-
- void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
-
- void setTimestamp(Delete d, long timestamp);
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
deleted file mode 100644
index 2c0f3b4..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.io.compress.Compression;
-
-public class HBaseCompat0_98 implements HBaseCompat {
-
- @Override
- public void setCompression(HColumnDescriptor cd, String algo) {
- cd.setCompressionType(Compression.Algorithm.valueOf(algo));
- }
-
- @Override
- public HTableDescriptor newTableDescriptor(String tableName) {
- TableName tn = TableName.valueOf(tableName);
- return new HTableDescriptor(tn);
- }
-
- @Override
- public ConnectionMask createConnection(Configuration conf) throws IOException
- {
- return new HConnection0_98(HConnectionManager.createConnection(conf));
- }
-
- @Override
- public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
- {
- tdesc.addFamily(cdesc);
- }
-
- @Override
- public void setTimestamp(Delete d, long timestamp)
- {
- d.setTimestamp(timestamp);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
deleted file mode 100644
index 633e525..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.io.compress.Compression;
-
-public class HBaseCompat1_0 implements HBaseCompat {
-
- @Override
- public void setCompression(HColumnDescriptor cd, String algo) {
- cd.setCompressionType(Compression.Algorithm.valueOf(algo));
- }
-
- @Override
- public HTableDescriptor newTableDescriptor(String tableName) {
- TableName tn = TableName.valueOf(tableName);
- return new HTableDescriptor(tn);
- }
-
- @Override
- public ConnectionMask createConnection(Configuration conf) throws IOException
- {
- return new HConnection1_0(ConnectionFactory.createConnection(conf));
- }
-
- @Override
- public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
- {
- tdesc.addFamily(cdesc);
- }
-
- @Override
- public void setTimestamp(Delete d, long timestamp)
- {
- d.setTimestamp(timestamp);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
deleted file mode 100644
index e5c3d31..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.io.compress.Compression;
-
-import java.io.IOException;
-
-public class HBaseCompat1_1 implements HBaseCompat {
-
- @Override
- public void setCompression(HColumnDescriptor cd, String algo) {
- cd.setCompressionType(Compression.Algorithm.valueOf(algo));
- }
-
- @Override
- public HTableDescriptor newTableDescriptor(String tableName) {
- TableName tn = TableName.valueOf(tableName);
- return new HTableDescriptor(tn);
- }
-
- @Override
- public ConnectionMask createConnection(Configuration conf) throws IOException
- {
- return new HConnection1_0(ConnectionFactory.createConnection(conf));
- }
-
- @Override
- public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
- {
- tdesc.addFamily(cdesc);
- }
-
- @Override
- public void setTimestamp(Delete d, long timestamp)
- {
- d.setTimestamp(timestamp);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
deleted file mode 100644
index 2c0d6fe..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import java.util.Arrays;
-
-import org.apache.hadoop.hbase.util.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HBaseCompatLoader {
-
- private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
-
- private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
-
- private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
-
- private static HBaseCompat cachedCompat;
-
- public synchronized static HBaseCompat getCompat(String classOverride) {
-
- if (null != cachedCompat) {
- log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
- return cachedCompat;
- }
-
- HBaseCompat compat;
- String className = null;
- String classNameSource = null;
-
- if (null != classOverride) {
- className = classOverride;
- classNameSource = "from explicit configuration";
- } else {
- String hbaseVersion = VersionInfo.getVersion();
- for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
- if (hbaseVersion.startsWith(supportedVersion + ".")) {
- className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
- classNameSource = "supporting runtime HBase version " + hbaseVersion;
- break;
- }
- }
- if (null == className) {
- log.info("The HBase version {} is not explicitly supported by Titan. " +
- "Loading Titan's compatibility layer for its most recent supported HBase version ({})",
- hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
- className = DEFAULT_HBASE_CLASS_NAME;
- classNameSource = " by default";
- }
- }
-
- final String errTemplate = " when instantiating HBase compatibility class " + className;
-
- try {
- compat = (HBaseCompat)Class.forName(className).newInstance();
- log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- } catch (InstantiationException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- }
-
- return cachedCompat = compat;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
deleted file mode 100644
index 7783a43..0000000
--- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Copyright 2012-2013 Aurelius LLC
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.thinkaurelius.titan.diskstorage.hbase;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.thinkaurelius.titan.diskstorage.*;
-import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
-import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
-import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
-import com.thinkaurelius.titan.util.system.IOUtils;
-
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
-import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Here are some areas that might need work:
- * <p/>
- * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
- * - tuning HTable#setWriteBufferSize (?)
- * - writing a server-side filter to replace ColumnCountGetFilter, which drops
- * all columns on the row where it reaches its limit. This requires getSlice,
- * currently, to impose its limit on the client side. That obviously won't
- * scale.
- * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
- * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
- * <p/>
- * There may be other problem areas. These are just the ones of which I'm aware.
- */
-public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
-
- private final String tableName;
- private final HBaseStoreManager storeManager;
-
- // When using shortened CF names, columnFamily is the shortname and storeName is the longname
- // When not using shortened CF names, they are the same
- //private final String columnFamily;
- private final String storeName;
- // This is columnFamily.getBytes()
- private final byte[] columnFamilyBytes;
- private final HBaseGetter entryGetter;
-
- private final ConnectionMask cnx;
-
- HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName) {
- this.storeManager = storeManager;
- this.cnx = cnx;
- this.tableName = tableName;
- //this.columnFamily = columnFamily;
- this.storeName = storeName;
- this.columnFamilyBytes = columnFamily.getBytes();
- this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
- }
-
- @Override
- public void close() throws BackendException {
- }
-
- @Override
- public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
- Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
- return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
- }
-
- @Override
- public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
- return getHelper(keys, getFilter(query));
- }
-
- @Override
- public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
- Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
- mutateMany(mutations, txh);
- }
-
- @Override
- public void acquireLock(StaticBuffer key,
- StaticBuffer column,
- StaticBuffer expectedValue,
- StoreTransaction txh) throws BackendException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
- return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
- query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
- new FilterList(FilterList.Operator.MUST_PASS_ALL),
- query);
- }
-
- @Override
- public String getName() {
- return storeName;
- }
-
- @Override
- public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
- return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
- }
-
- public static Filter getFilter(SliceQuery query) {
- byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
- byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
-
- Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
-
- if (query.hasLimit()) {
- filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
- filter,
- new ColumnPaginationFilter(query.getLimit(), 0));
- }
-
- logger.debug("Generated HBase Filter {}", filter);
-
- return filter;
- }
-
- private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
- List<Get> requests = new ArrayList<Get>(keys.size());
- {
- for (StaticBuffer key : keys) {
- Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
- try {
- g.setTimeRange(0, Long.MAX_VALUE);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
- requests.add(g);
- }
- }
-
- Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
-
- try {
- TableMask table = null;
- Result[] results = null;
-
- try {
- table = cnx.getTable(tableName);
- results = table.get(requests);
- } finally {
- IOUtils.closeQuietly(table);
- }
-
- if (results == null)
- return KCVSUtil.emptyResults(keys);
-
- assert results.length==keys.size();
-
- for (int i = 0; i < results.length; i++) {
- Result result = results[i];
- NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
-
- if (f == null) { // no result for this key
- resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
- continue;
- }
-
- // actual key with <timestamp, value>
- NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
- resultMap.put(keys.get(i), (r == null)
- ? EntryList.EMPTY_LIST
- : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
- }
-
- return resultMap;
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- }
- }
-
- private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
- storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
- }
-
- private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
- return executeKeySliceQuery(null, null, filters, columnSlice);
- }
-
- private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
- @Nullable byte[] endKey,
- FilterList filters,
- @Nullable SliceQuery columnSlice) throws BackendException {
- Scan scan = new Scan().addFamily(columnFamilyBytes);
-
- try {
- scan.setTimeRange(0, Long.MAX_VALUE);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
-
- if (startKey != null)
- scan.setStartRow(startKey);
-
- if (endKey != null)
- scan.setStopRow(endKey);
-
- if (columnSlice != null) {
- filters.addFilter(getFilter(columnSlice));
- }
-
- TableMask table = null;
-
- try {
- table = cnx.getTable(tableName);
- return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
- } catch (IOException e) {
- IOUtils.closeQuietly(table);
- throw new PermanentBackendException(e);
- }
- }
-
- private class RowIterator implements KeyIterator {
- private final Closeable table;
- private final Iterator<Result> rows;
- private final byte[] columnFamilyBytes;
-
- private Result currentRow;
- private boolean isClosed;
-
- public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
- this.table = table;
- this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
- this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
- @Override
- public boolean apply(@Nullable Result result) {
- if (result == null)
- return false;
-
- try {
- StaticBuffer id = StaticArrayBuffer.of(result.getRow());
- id.getLong(0);
- } catch (NumberFormatException e) {
- return false;
- }
-
- return true;
- }
- });
- }
-
- @Override
- public RecordIterator<Entry> getEntries() {
- ensureOpen();
-
- return new RecordIterator<Entry>() {
- private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator();
-
- @Override
- public boolean hasNext() {
- ensureOpen();
- return kv.hasNext();
- }
-
- @Override
- public Entry next() {
- ensureOpen();
- return StaticArrayEntry.ofBytes(kv.next(), entryGetter);
- }
-
- @Override
- public void close() {
- isClosed = true;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public boolean hasNext() {
- ensureOpen();
- return rows.hasNext();
- }
-
- @Override
- public StaticBuffer next() {
- ensureOpen();
-
- currentRow = rows.next();
- return StaticArrayBuffer.of(currentRow.getRow());
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(table);
- isClosed = true;
- logger.debug("RowIterator closed table {}", table);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private void ensureOpen() {
- if (isClosed)
- throw new IllegalStateException("Iterator has been closed.");
- }
- }
-
- private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
-
- private final EntryMetaData[] schema;
-
- private HBaseGetter(EntryMetaData[] schema) {
- this.schema = schema;
- }
-
- @Override
- public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
- return element.getKey();
- }
-
- @Override
- public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
- return element.getValue().lastEntry().getValue();
- }
-
- @Override
- public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
- return schema;
- }
-
- @Override
- public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
- switch(meta) {
- case TIMESTAMP:
- return element.getValue().lastEntry().getKey();
- default:
- throw new UnsupportedOperationException("Unsupported meta data: " + meta);
- }
- }
- }
-}