You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by dk...@apache.org on 2016/08/05 23:27:38 UTC
[8/9] incubator-atlas git commit: ATLAS-693 Titan 0.5.4
implementation of graph db abstraction. (jnhagelb via dkantor)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/pom.xml
----------------------------------------------------------------------
diff --git a/graphdb/titan0/pom.xml b/graphdb/titan0/pom.xml
new file mode 100644
index 0000000..f2dc9a8
--- /dev/null
+++ b/graphdb/titan0/pom.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one
+~ or more contributor license agreements. See the NOTICE file
+~ distributed with this work for additional information
+~ regarding copyright ownership. The ASF licenses this file
+~ to you under the Apache License, Version 2.0 (the
+~ "License"); you may not use this file except in compliance
+~ with the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>atlas-graphdb</artifactId>
+ <groupId>org.apache.atlas</groupId>
+ <version>0.8-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>atlas-graphdb-titan0</artifactId>
+ <description>Apache Atlas Titan 0.5.4 Graph DB Impl</description>
+ <name>Apache Atlas Titan 0.5.4 Graph DB Impl</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <tinkerpop.version>2.6.0</tinkerpop.version>
+ <titan.version>0.5.4</titan.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- for graphdb interface definitions -->
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-graphdb-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-graphdb-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-core</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.tinkerpop.blueprints</groupId>
+ <artifactId>blueprints-core</artifactId>
+ <version>${tinkerpop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.tinkerpop.gremlin</groupId>
+ <artifactId>gremlin-java</artifactId>
+ <version>${tinkerpop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.vividsolutions</groupId>
+ <artifactId>jts</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-es</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-berkeleyje</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-lucene</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!--
+ Create 'uber' jar that contains all of the dependencies (except those whose scope is provided)
+ Only Titan 0l5l4 and its dependencies are included. The other dependencies are bundled in the war file.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <excludes>
+ <!-- these are bundled with Atlas -->
+ <exclude>org.slf4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>com.thinkaurelius.titan:titan-core</artifact>
+ <!-- force use of our custom LocalLockMediator implementation -->
+ <excludes>
+ <exclude>com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator*</exclude>
+ </excludes>
+ </filter>
+
+ </filters>
+ <createSourcesJar>true</createSourcesJar>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <excludes>
+ <exclude>**/log4j.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencyManagement>
+ <dependencies>
+ <!-- Graph DB -->
+ <dependency>
+ <groupId>com.tinkerpop.blueprints</groupId>
+ <artifactId>blueprints-core</artifactId>
+ <version>${tinkerpop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-core</artifactId>
+ <version>${titan.version}</version>
+ <exclusions>
+ <!-- rexster does not work with servlet-api -->
+ <exclusion>
+ <groupId>com.tinkerpop.rexster</groupId>
+ <artifactId>rexster-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.tinkerpop.rexster</groupId>
+ <artifactId>rexster-server</artifactId>
+ </exclusion>
+ <!-- asm 4.0 does not work with jersey asm 3.1 -->
+ <exclusion>
+ <groupId>com.tinkerpop</groupId>
+ <artifactId>frames</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.esotericsoftware.reflectasm</groupId>
+ <artifactId>reflectasm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion> <!-- GPL license imported from ganglia -->
+ <groupId>org.acplt</groupId>
+ <artifactId>oncrpc</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-berkeleyje</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-hbase</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ </dependencies>
+ </dependencyManagement>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
new file mode 100644
index 0000000..e255f1b
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface AdminMask extends Closeable
+{
+
+ void clearTable(String tableName, long timestamp) throws IOException;
+
+ HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
+
+ boolean tableExists(String tableName) throws IOException;
+
+ void createTable(HTableDescriptor desc) throws IOException;
+
+ void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
+
+ /**
+ * Estimate the number of regionservers in the HBase cluster.
+ *
+ * This is usually implemented by calling
+ * {@link HBaseAdmin#getClusterStatus()} and then
+ * {@link ClusterStatus#getServers()} and finally {@code size()} on the
+ * returned server list.
+ *
+ * @return the number of servers in the cluster or -1 if it could not be determined
+ */
+ int getEstimatedRegionServerCount();
+
+ void disableTable(String tableName) throws IOException;
+
+ void enableTable(String tableName) throws IOException;
+
+ boolean isTableDisabled(String tableName) throws IOException;
+
+ void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
new file mode 100644
index 0000000..feb578b
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface ConnectionMask extends Closeable
+{
+
+ TableMask getTable(String name) throws IOException;
+
+ AdminMask getAdmin() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
new file mode 100644
index 0000000..0cd4795
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.thinkaurelius.titan.util.system.IOUtils;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+
+public class HBaseAdmin0_98 implements AdminMask
+{
+
+ private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
+
+ private final HBaseAdmin adm;
+
+ public HBaseAdmin0_98(HBaseAdmin adm)
+ {
+ this.adm = adm;
+ }
+
+ @Override
+ public void clearTable(String tableName, long timestamp) throws IOException
+ {
+ if (!adm.tableExists(tableName)) {
+ log.debug("clearStorage() called before table {} was created, skipping.", tableName);
+ return;
+ }
+
+ // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
+ // disabling and deleting tables.
+ HTable table = null;
+
+ try {
+ table = new HTable(adm.getConfiguration(), tableName);
+
+ Scan scan = new Scan();
+ scan.setBatch(100);
+ scan.setCacheBlocks(false);
+ scan.setCaching(2000);
+ scan.setTimeRange(0, Long.MAX_VALUE);
+ scan.setMaxVersions(1);
+
+ ResultScanner scanner = null;
+
+ try {
+ scanner = table.getScanner(scan);
+
+ for (Result res : scanner) {
+ Delete d = new Delete(res.getRow());
+
+ d.setTimestamp(timestamp);
+ table.delete(d);
+ }
+ } finally {
+ IOUtils.closeQuietly(scanner);
+ }
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
+ {
+ return adm.getTableDescriptor(tableName.getBytes());
+ }
+
+ @Override
+ public boolean tableExists(String tableName) throws IOException
+ {
+ return adm.tableExists(tableName);
+ }
+
+ @Override
+ public void createTable(HTableDescriptor desc) throws IOException
+ {
+ adm.createTable(desc);
+ }
+
+ @Override
+ public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+ {
+ adm.createTable(desc, startKey, endKey, numRegions);
+ }
+
+ @Override
+ public int getEstimatedRegionServerCount()
+ {
+ int serverCount = -1;
+ try {
+ serverCount = adm.getClusterStatus().getServers().size();
+ log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+ } catch (IOException e) {
+ log.debug("Unable to retrieve HBase cluster status", e);
+ }
+ return serverCount;
+ }
+
+ @Override
+ public void disableTable(String tableName) throws IOException
+ {
+ adm.disableTable(tableName);
+ }
+
+ @Override
+ public void enableTable(String tableName) throws IOException
+ {
+ adm.enableTable(tableName);
+ }
+
+ @Override
+ public boolean isTableDisabled(String tableName) throws IOException
+ {
+ return adm.isTableDisabled(tableName);
+ }
+
+ @Override
+ public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
+ {
+ adm.addColumn(tableName, columnDescriptor);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ adm.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
new file mode 100644
index 0000000..7e8f72d
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class HBaseAdmin1_0 implements AdminMask
+{
+
+ private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
+
+ private final Admin adm;
+
+ public HBaseAdmin1_0(HBaseAdmin adm)
+ {
+ this.adm = adm;
+ }
+ @Override
+ public void clearTable(String tableString, long timestamp) throws IOException
+ {
+ TableName tableName = TableName.valueOf(tableString);
+
+ if (!adm.tableExists(tableName)) {
+ log.debug("Attempted to clear table {} before it exists (noop)", tableString);
+ return;
+ }
+
+ if (!adm.isTableDisabled(tableName))
+ adm.disableTable(tableName);
+
+ if (!adm.isTableDisabled(tableName))
+ throw new RuntimeException("Unable to disable table " + tableName);
+
+ // This API call appears to both truncate and reenable the table.
+ log.info("Truncating table {}", tableName);
+ adm.truncateTable(tableName, true /* preserve splits */);
+
+ try {
+ adm.enableTable(tableName);
+ } catch (TableNotDisabledException e) {
+ // This triggers seemingly every time in testing with 1.0.2.
+ log.debug("Table automatically reenabled by truncation: {}", tableName, e);
+ }
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
+ {
+ return adm.getTableDescriptor(TableName.valueOf(tableString));
+ }
+
+ @Override
+ public boolean tableExists(String tableString) throws IOException
+ {
+ return adm.tableExists(TableName.valueOf(tableString));
+ }
+
+ @Override
+ public void createTable(HTableDescriptor desc) throws IOException
+ {
+ adm.createTable(desc);
+ }
+
+ @Override
+ public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+ {
+ adm.createTable(desc, startKey, endKey, numRegions);
+ }
+
+ @Override
+ public int getEstimatedRegionServerCount()
+ {
+ int serverCount = -1;
+ try {
+ serverCount = adm.getClusterStatus().getServers().size();
+ log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+ } catch (IOException e) {
+ log.debug("Unable to retrieve HBase cluster status", e);
+ }
+ return serverCount;
+ }
+
+ @Override
+ public void disableTable(String tableString) throws IOException
+ {
+ adm.disableTable(TableName.valueOf(tableString));
+ }
+
+ @Override
+ public void enableTable(String tableString) throws IOException
+ {
+ adm.enableTable(TableName.valueOf(tableString));
+ }
+
+ @Override
+ public boolean isTableDisabled(String tableString) throws IOException
+ {
+ return adm.isTableDisabled(TableName.valueOf(tableString));
+ }
+
+ @Override
+ public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
+ {
+ adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ adm.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
new file mode 100644
index 0000000..c9b03aa
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+
+public interface HBaseCompat {
+
+ /**
+ * Configure the compression scheme {@code algo} on a column family
+ * descriptor {@code cd}. The {@code algo} parameter is a string value
+ * corresponding to one of the values of HBase's Compression enum. The
+ * Compression enum has moved between packages as HBase has evolved, which
+ * is why this method has a String argument in the signature instead of the
+ * enum itself.
+ *
+ * @param cd
+ * column family to configure
+ * @param algo
+ * compression type to use
+ */
+ public void setCompression(HColumnDescriptor cd, String algo);
+
+ /**
+ * Create and return a HTableDescriptor instance with the given name. The
+ * constructors on this method have remained stable over HBase development
+ * so far, but the old HTableDescriptor(String) constructor & byte[] friends
+ * are now marked deprecated and may eventually be removed in favor of the
+ * HTableDescriptor(TableName) constructor. That constructor (and the
+ * TableName type) only exists in newer HBase versions. Hence this method.
+ *
+ * @param tableName
+ * HBase table name
+ * @return a new table descriptor instance
+ */
+ public HTableDescriptor newTableDescriptor(String tableName);
+
+ ConnectionMask createConnection(Configuration conf) throws IOException;
+
+ void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
+
+ void setTimestamp(Delete d, long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
new file mode 100644
index 0000000..2c0f3b4
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat0_98 implements HBaseCompat {
+
+ @Override
+ public void setCompression(HColumnDescriptor cd, String algo) {
+ cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+ }
+
+ @Override
+ public HTableDescriptor newTableDescriptor(String tableName) {
+ TableName tn = TableName.valueOf(tableName);
+ return new HTableDescriptor(tn);
+ }
+
+ @Override
+ public ConnectionMask createConnection(Configuration conf) throws IOException
+ {
+ return new HConnection0_98(HConnectionManager.createConnection(conf));
+ }
+
+ @Override
+ public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+ {
+ tdesc.addFamily(cdesc);
+ }
+
+ @Override
+ public void setTimestamp(Delete d, long timestamp)
+ {
+ d.setTimestamp(timestamp);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
new file mode 100644
index 0000000..bb3fb3b
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat1_0 implements HBaseCompat {
+
+ @Override
+ public void setCompression(HColumnDescriptor cd, String algo) {
+ cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+ }
+
+ @Override
+ public HTableDescriptor newTableDescriptor(String tableName) {
+ TableName tn = TableName.valueOf(tableName);
+ return new HTableDescriptor(tn);
+ }
+
+ @Override
+ public ConnectionMask createConnection(Configuration conf) throws IOException
+ {
+ return new HConnection1_0(ConnectionFactory.createConnection(conf));
+ }
+
+ @Override
+ public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+ {
+ tdesc.addFamily(cdesc);
+ }
+
+ @Override
+ public void setTimestamp(Delete d, long timestamp)
+ {
+ d.setTimestamp(timestamp);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
new file mode 100644
index 0000000..e5c3d31
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+import java.io.IOException;
+
+public class HBaseCompat1_1 implements HBaseCompat {
+
+ @Override
+ public void setCompression(HColumnDescriptor cd, String algo) {
+ cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+ }
+
+ @Override
+ public HTableDescriptor newTableDescriptor(String tableName) {
+ TableName tn = TableName.valueOf(tableName);
+ return new HTableDescriptor(tn);
+ }
+
+ @Override
+ public ConnectionMask createConnection(Configuration conf) throws IOException
+ {
+ return new HConnection1_0(ConnectionFactory.createConnection(conf));
+ }
+
+ @Override
+ public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+ {
+ tdesc.addFamily(cdesc);
+ }
+
+ @Override
+ public void setTimestamp(Delete d, long timestamp)
+ {
+ d.setTimestamp(timestamp);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
new file mode 100644
index 0000000..2c0d6fe
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseCompatLoader {
+
+ private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
+
+ private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
+
+ private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
+
+ private static HBaseCompat cachedCompat;
+
+ public synchronized static HBaseCompat getCompat(String classOverride) {
+
+ if (null != cachedCompat) {
+ log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
+ return cachedCompat;
+ }
+
+ HBaseCompat compat;
+ String className = null;
+ String classNameSource = null;
+
+ if (null != classOverride) {
+ className = classOverride;
+ classNameSource = "from explicit configuration";
+ } else {
+ String hbaseVersion = VersionInfo.getVersion();
+ for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
+ if (hbaseVersion.startsWith(supportedVersion + ".")) {
+ className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
+ classNameSource = "supporting runtime HBase version " + hbaseVersion;
+ break;
+ }
+ }
+ if (null == className) {
+ log.info("The HBase version {} is not explicitly supported by Titan. " +
+ "Loading Titan's compatibility layer for its most recent supported HBase version ({})",
+ hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
+ className = DEFAULT_HBASE_CLASS_NAME;
+ classNameSource = " by default";
+ }
+ }
+
+ final String errTemplate = " when instantiating HBase compatibility class " + className;
+
+ try {
+ compat = (HBaseCompat)Class.forName(className).newInstance();
+ log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+ }
+
+ return cachedCompat = compat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
new file mode 100644
index 0000000..c5f6e0d
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.core.attribute.Duration;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.util.system.IOUtils;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Here are some areas that might need work:
+ * <p/>
+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
+ * - tuning HTable#setWriteBufferSize (?)
+ * - writing a server-side filter to replace ColumnCountGetFilter, which drops
+ * all columns on the row where it reaches its limit. This requires getSlice,
+ * currently, to impose its limit on the client side. That obviously won't
+ * scale.
+ * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
+ * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
+ * <p/>
+ * There may be other problem areas. These are just the ones of which I'm aware.
+ */
+public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
+
+ private final String tableName;
+ private final HBaseStoreManager storeManager;
+
+ // When using shortened CF names, columnFamily is the shortname and storeName is the longname
+ // When not using shortened CF names, they are the same
+ //private final String columnFamily;
+ private final String storeName;
+ // This is columnFamily.getBytes()
+ private final byte[] columnFamilyBytes;
+ private final HBaseGetter entryGetter;
+
+ private final ConnectionMask cnx;
+
+ private LocalLockMediator<StoreTransaction> localLockMediator;
+
+ private final Duration lockExpiryTimeMs;
+ private final Duration lockMaxWaitTimeMs;
+ private final Integer lockMaxRetries;
+
+ HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
+ this.storeManager = storeManager;
+ this.cnx = cnx;
+ this.tableName = tableName;
+ //this.columnFamily = columnFamily;
+ this.storeName = storeName;
+ this.columnFamilyBytes = columnFamily.getBytes();
+ this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
+ this.localLockMediator = llm;
+ Configuration storageConfig = storeManager.getStorageConfig();
+ this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE);
+ this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT);
+ this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY);
+ }
+
+ @Override
+ public void close() throws BackendException {
+ }
+
+ @Override
+ public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
+ return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+ }
+
+ @Override
+ public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getHelper(keys, getFilter(query));
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+ mutateMany(mutations, txh);
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key,
+ StaticBuffer column,
+ StaticBuffer expectedValue,
+ StoreTransaction txh) throws BackendException {
+
+ KeyColumn lockID = new KeyColumn(key, column);
+ logger.debug("Attempting to acquireLock on {} ", lockID);
+ int trialCount = 0;
+ boolean locked;
+ while (trialCount < lockMaxRetries) {
+ final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs));
+ trialCount++;
+ if (!locked) {
+ handleLockFailure(txh, lockID, trialCount);
+ } else {
+ logger.debug("Acquired lock on {}, {}", lockID, txh);
+ break;
+ }
+ }
+ ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
+ }
+
+ void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException {
+ if (trialCount < lockMaxRetries) {
+ try {
+ Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS));
+ } catch (InterruptedException e) {
+ throw new PermanentLockingException(
+ "Interrupted while waiting for acquiring lock for transaction "
+ + txh + " lockID " + lockID + " on retry " + trialCount, e);
+ }
+ } else {
+ throw new PermanentLockingException("Could not lock the keyColumn " +
+ lockID + " on CF {} " + Bytes.toString(columnFamilyBytes));
+ }
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
+ return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
+ query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
+ new FilterList(FilterList.Operator.MUST_PASS_ALL),
+ query);
+ }
+
+ @Override
+ public String getName() {
+ return storeName;
+ }
+
+ @Override
+ public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
+ return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
+ }
+
+ public static Filter getFilter(SliceQuery query) {
+ byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
+ byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
+
+ Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
+
+ if (query.hasLimit()) {
+ filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+ filter,
+ new ColumnPaginationFilter(query.getLimit(), 0));
+ }
+
+ logger.debug("Generated HBase Filter {}", filter);
+
+ return filter;
+ }
+
+ private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
+ List<Get> requests = new ArrayList<Get>(keys.size());
+ {
+ for (StaticBuffer key : keys) {
+ Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
+ try {
+ g.setTimeRange(0, Long.MAX_VALUE);
+ } catch (IOException e) {
+ throw new PermanentBackendException(e);
+ }
+ requests.add(g);
+ }
+ }
+
+ Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
+
+ try {
+ TableMask table = null;
+ Result[] results = null;
+
+ try {
+ table = cnx.getTable(tableName);
+ logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+ results = table.get(requests);
+ logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+
+ if (results == null)
+ return KCVSUtil.emptyResults(keys);
+
+ assert results.length==keys.size();
+
+ for (int i = 0; i < results.length; i++) {
+ Result result = results[i];
+ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
+
+ if (f == null) { // no result for this key
+ resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
+ continue;
+ }
+
+ // actual key with <timestamp, value>
+ NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
+ resultMap.put(keys.get(i), (r == null)
+ ? EntryList.EMPTY_LIST
+ : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
+ }
+
+ return resultMap;
+ } catch (IOException e) {
+ throw new TemporaryBackendException(e);
+ }
+ }
+
+ private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+ storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
+ }
+
+ private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
+ return executeKeySliceQuery(null, null, filters, columnSlice);
+ }
+
+ private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
+ @Nullable byte[] endKey,
+ FilterList filters,
+ @Nullable SliceQuery columnSlice) throws BackendException {
+ Scan scan = new Scan().addFamily(columnFamilyBytes);
+
+ try {
+ scan.setTimeRange(0, Long.MAX_VALUE);
+ } catch (IOException e) {
+ throw new PermanentBackendException(e);
+ }
+
+ if (startKey != null)
+ scan.setStartRow(startKey);
+
+ if (endKey != null)
+ scan.setStopRow(endKey);
+
+ if (columnSlice != null) {
+ filters.addFilter(getFilter(columnSlice));
+ }
+
+ TableMask table = null;
+
+ logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey));
+ try {
+ table = cnx.getTable(tableName);
+ return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
+ } catch (IOException e) {
+ IOUtils.closeQuietly(table);
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ private class RowIterator implements KeyIterator {
+ private final Closeable table;
+ private final Iterator<Result> rows;
+ private final byte[] columnFamilyBytes;
+
+ private Result currentRow;
+ private boolean isClosed;
+
+ public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
+ this.table = table;
+ this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
+ this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
+ @Override
+ public boolean apply(@Nullable Result result) {
+ if (result == null)
+ return false;
+
+ try {
+ StaticBuffer id = StaticArrayBuffer.of(result.getRow());
+ id.getLong(0);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+
+ return true;
+ }
+ });
+ }
+
+ @Override
+ public RecordIterator<Entry> getEntries() {
+ ensureOpen();
+
+ return new RecordIterator<Entry>() {
+ private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> currentMap = currentRow.getMap();
+ private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentMap == null ? null : currentMap.get(columnFamilyBytes).entrySet().iterator();
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return kv == null ? false : kv.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ ensureOpen();
+ return kv == null ? null : StaticArrayEntry.ofBytes(kv.next(), entryGetter);
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return rows.hasNext();
+ }
+
+ @Override
+ public StaticBuffer next() {
+ ensureOpen();
+
+ currentRow = rows.next();
+ return StaticArrayBuffer.of(currentRow.getRow());
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(table);
+ isClosed = true;
+ logger.debug("RowIterator closed table {}", table);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void ensureOpen() {
+ if (isClosed)
+ throw new IllegalStateException("Iterator has been closed.");
+ }
+ }
+
+ private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
+
+ private final EntryMetaData[] schema;
+
+ private HBaseGetter(EntryMetaData[] schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+ return element.getKey();
+ }
+
+ @Override
+ public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+ return element.getValue().lastEntry().getValue();
+ }
+
+ @Override
+ public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+ return schema;
+ }
+
+ @Override
+ public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
+ switch(meta) {
+ case TIMESTAMP:
+ return element.getValue().lastEntry().getKey();
+ default:
+ throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+ }
+ }
+ }
+}