You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2012/10/03 23:51:38 UTC
svn commit: r1393785 - in /mahout/trunk/integration: pom.xml
src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/
src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
Author: srowen
Date: Wed Oct 3 21:51:38 2012
New Revision: 1393785
URL: http://svn.apache.org/viewvc?rev=1393785&view=rev
Log:
MAHOUT-202 add Hbase data model
Added:
mahout/trunk/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/
mahout/trunk/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
Modified:
mahout/trunk/integration/pom.xml
Modified: mahout/trunk/integration/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/pom.xml?rev=1393785&r1=1393784&r2=1393785&view=diff
==============================================================================
--- mahout/trunk/integration/pom.xml (original)
+++ mahout/trunk/integration/pom.xml Wed Oct 3 21:51:38 2012
@@ -177,6 +177,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.94.1</version>
+ </dependency>
+
+ <dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
<version>0.8.0-2</version>
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java?rev=1393785&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java Wed Oct 3 21:51:38 2012
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.model.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.mahout.cf.taste.common.NoSuchItemException;
+import org.apache.mahout.cf.taste.common.NoSuchUserException;
+import org.apache.mahout.cf.taste.common.Refreshable;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericItemPreferenceArray;
+import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+
+/**
+ * <p>Naive approach of storing one preference as one value in the table.
+ * Preferences are indexed as (user, item) and (item, user) for O(1) lookups.</p>
+ *
+ * <p>The default table name is "taste", this can be set through a constructor
+ * argument. Each row has a value starting with "i" or "u" followed by the
+ * actual id encoded as a big endian long.</p>
+ *
+ * <p>E.g., "u\x00\x00\x00\x00\x00\x00\x04\xd2" is user 1234L</p>
+ *
+ * <p>There are two column families: "users" and "items".</p>
+ *
+ * <p>The "users" column family holds user->item preferences. Each userID is the
+ * column qualifier and the value is the preference.</p>
+ *
+ * <p>The "items" column fmaily holds item->user preferences. Each itemID is the
+ * column qualifier and the value is the preference.</p>
+ *
+ * <p>User IDs and item IDs are cached in a FastIDSet since it requires a full
+ * table scan to build these sets. Preferences are not cached since they
+ * are pretty cheap lookups in HBase (also caching the Preferences defeats
+ * the purpose of a scalable storage engine like HBase).</p>
+ */
+public final class HBaseDataModel implements DataModel, Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(HBaseDataModel.class);
+
+ private static final String DEFAULT_TABLE = "taste";
+ private static final byte[] USERS_CF = Bytes.toBytes("users");
+ private static final byte[] ITEMS_CF = Bytes.toBytes("items");
+
+ private final HTablePool pool;
+ private final String tableName;
+ public final boolean tableWasCreated;
+
+ // Cache of user and item ids
+ private volatile FastIDSet itemIDs;
+ private volatile FastIDSet userIDs;
+
+ public HBaseDataModel(String zkConnect) throws IOException {
+ this(zkConnect, DEFAULT_TABLE);
+ }
+
+ public HBaseDataModel(String zkConnect, String tableName) throws IOException {
+ log.info("Using HBase table {}", tableName);
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.zookeeper.quorum", zkConnect);
+ HTableFactory tableFactory = new HTableFactory();
+ this.pool = new HTablePool(conf, 8, tableFactory);
+ this.tableName = tableName;
+ this.tableWasCreated = bootstrap(conf);
+ // Warm the cache
+ refresh(null);
+ }
+
+ public HBaseDataModel(HTablePool pool, String tableName, Configuration conf) throws IOException {
+ log.info("Using HBase table {}", tableName);
+ this.pool = pool;
+ this.tableName = tableName;
+ this.tableWasCreated = bootstrap(conf);
+
+ // Warm the cache
+ refresh(null);
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Create the table if it doesn't exist
+ */
+ private boolean bootstrap(Configuration conf) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor tDesc = new HTableDescriptor(Bytes.toBytes(tableName));
+ tDesc.addFamily(new HColumnDescriptor(USERS_CF));
+ tDesc.addFamily(new HColumnDescriptor(ITEMS_CF));
+ try {
+ admin.createTable(tDesc);
+ log.info("Created table {}", tableName);
+ return true;
+ } catch (TableExistsException e) {
+ log.info("Table {} alreay exists", tableName);
+ return false;
+ } finally {
+ admin.close();
+ }
+ }
+
+ /**
+ * Prefix a user id with "u" and convert to byte[]
+ */
+ private static byte[] userToBytes(long userID) {
+ ByteBuffer bb = ByteBuffer.allocate(9);
+ bb.put((byte)0x75); // The letter "u"
+ bb.putLong(userID);
+ return bb.array();
+ }
+
+ /**
+ * Prefix an item id with "i" and convert to byte[]
+ */
+ private static byte[] itemToBytes(long itemID) {
+ ByteBuffer bb = ByteBuffer.allocate(9);
+ bb.put((byte)0x69); // The letter "i"
+ bb.putLong(itemID);
+ return bb.array();
+ }
+
+ /**
+ * Extract the id out of a prefix byte[] id
+ */
+ private static long bytesToUserOrItemID(byte[] ba) {
+ ByteBuffer bb = ByteBuffer.wrap(ba);
+ return bb.getLong(1);
+ }
+
+ /* DataModel interface */
+
+ @Override
+ public LongPrimitiveIterator getUserIDs() {
+ return userIDs.iterator();
+ }
+
+ @Override
+ public PreferenceArray getPreferencesFromUser(long userID) throws TasteException {
+ Result result;
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ Get get = new Get(userToBytes(userID));
+ get.addFamily(ITEMS_CF);
+ result = table.get(get);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to retrieve user preferences from HBase", e);
+ }
+
+ if (result.isEmpty()) {
+ throw new NoSuchUserException(userID);
+ }
+
+ SortedMap<byte[], byte[]> families = result.getFamilyMap(ITEMS_CF);
+ PreferenceArray prefs = new GenericUserPreferenceArray(families.size());
+ prefs.setUserID(0, userID);
+ int i = 0;
+ for (Map.Entry<byte[], byte[]> entry : families.entrySet()) {
+ prefs.setItemID(i, Bytes.toLong(entry.getKey()));
+ prefs.setValue(i, Bytes.toFloat(entry.getValue()));
+ i++;
+ }
+ return prefs;
+ }
+
+ @Override
+ public FastIDSet getItemIDsFromUser(long userID) throws TasteException {
+ Result result;
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ Get get = new Get(userToBytes(userID));
+ get.addFamily(ITEMS_CF);
+ result = table.get(get);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to retrieve item IDs from HBase", e);
+ }
+
+ if (result.isEmpty()) {
+ throw new NoSuchUserException(userID);
+ }
+
+ SortedMap<byte[],byte[]> families = result.getFamilyMap(ITEMS_CF);
+ FastIDSet ids = new FastIDSet(families.size());
+ for (byte[] family: families.keySet()) {
+ ids.add(Bytes.toLong(family));
+ }
+ return ids;
+ }
+
+ @Override
+ public LongPrimitiveIterator getItemIDs() {
+ return itemIDs.iterator();
+ }
+
+ @Override
+ public PreferenceArray getPreferencesForItem(long itemID) throws TasteException {
+ Result result;
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ Get get = new Get(itemToBytes(itemID));
+ get.addFamily(USERS_CF);
+ result = table.get(get);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to retrieve item preferences from HBase", e);
+ }
+
+ if (result.isEmpty()) {
+ throw new NoSuchItemException(itemID);
+ }
+
+ SortedMap<byte[], byte[]> families = result.getFamilyMap(USERS_CF);
+ PreferenceArray prefs = new GenericItemPreferenceArray(families.size());
+ prefs.setItemID(0, itemID);
+ int i = 0;
+ for (Map.Entry<byte[], byte[]> entry : families.entrySet()) {
+ prefs.setUserID(i, Bytes.toLong(entry.getKey()));
+ prefs.setValue(i, Bytes.toFloat(entry.getValue()));
+ i++;
+ }
+ return prefs;
+ }
+
+ @Override
+ public Float getPreferenceValue(long userID, long itemID) throws TasteException {
+ Result result;
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ Get get = new Get(userToBytes(userID));
+ get.addColumn(ITEMS_CF, Bytes.toBytes(itemID));
+ result = table.get(get);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to retrieve user preferences from HBase", e);
+ }
+
+ if (result.isEmpty()) {
+ throw new NoSuchUserException(userID);
+ }
+
+ if (result.containsColumn(ITEMS_CF, Bytes.toBytes(itemID))) {
+ return Bytes.toFloat(result.getValue(ITEMS_CF, Bytes.toBytes(itemID)));
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Long getPreferenceTime(long userID, long itemID) throws TasteException {
+ Result result;
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ Get get = new Get(userToBytes(userID));
+ get.addColumn(ITEMS_CF, Bytes.toBytes(itemID));
+ result = table.get(get);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to retrieve user preferences from HBase", e);
+ }
+
+ if (result.isEmpty()) {
+ throw new NoSuchUserException(userID);
+ }
+
+ if (result.containsColumn(ITEMS_CF, Bytes.toBytes(itemID))) {
+ KeyValue kv = result.getColumnLatest(ITEMS_CF, Bytes.toBytes(itemID));
+ return kv.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getNumItems() {
+ return itemIDs.size();
+ }
+
+ @Override
+ public int getNumUsers() {
+ return userIDs.size();
+ }
+
+ @Override
+ public int getNumUsersWithPreferenceFor(long itemID) throws TasteException {
+ PreferenceArray prefs = getPreferencesForItem(itemID);
+ return prefs.length();
+ }
+
+ @Override
+ public int getNumUsersWithPreferenceFor(long itemID1, long itemID2) throws TasteException {
+ Result[] results;
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ List<Get> gets = new ArrayList<Get>(2);
+ gets.add(new Get(itemToBytes(itemID1)));
+ gets.add(new Get(itemToBytes(itemID2)));
+ gets.get(0).addFamily(USERS_CF);
+ gets.get(1).addFamily(USERS_CF);
+ results = table.get(gets);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to retrieve item preferences from HBase", e);
+ }
+
+ if (results[0].isEmpty()) {
+ throw new NoSuchItemException(itemID1);
+ }
+ if (results[1].isEmpty()) {
+ throw new NoSuchItemException(itemID2);
+ }
+
+ // First item
+ Result result = results[0];
+ SortedMap<byte[], byte[]> families = result.getFamilyMap(USERS_CF);
+ FastIDSet idSet1 = new FastIDSet(families.size());
+ for (byte[] id : families.keySet()) {
+ idSet1.add(Bytes.toLong(id));
+ }
+
+ // Second item
+ result = results[1];
+ families = result.getFamilyMap(USERS_CF);
+ FastIDSet idSet2 = new FastIDSet(families.size());
+ for (byte[] id : families.keySet()) {
+ idSet2.add(Bytes.toLong(id));
+ }
+
+ return idSet1.intersectionSize(idSet2);
+ }
+
+ @Override
+ public void setPreference(long userID, long itemID, float value) throws TasteException {
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ List<Put> puts = new ArrayList<Put>(2);
+ puts.add(new Put(userToBytes(userID)));
+ puts.add(new Put(itemToBytes(itemID)));
+ puts.get(0).add(ITEMS_CF, Bytes.toBytes(itemID), Bytes.toBytes(value));
+ puts.get(1).add(USERS_CF, Bytes.toBytes(userID), Bytes.toBytes(value));
+ table.put(puts);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to store preference in HBase", e);
+ }
+ }
+
+ @Override
+ public void removePreference(long userID, long itemID) throws TasteException {
+ try {
+ HTableInterface table = pool.getTable(tableName);
+ List<Delete> deletes = new ArrayList<Delete>(2);
+ deletes.add(new Delete(userToBytes(userID)));
+ deletes.add(new Delete(itemToBytes(itemID)));
+ deletes.get(0).deleteColumns(ITEMS_CF, Bytes.toBytes(itemID));
+ deletes.get(1).deleteColumns(USERS_CF, Bytes.toBytes(userID));
+ table.delete(deletes);
+ table.close();
+ } catch (IOException e) {
+ throw new TasteException("Failed to remove preference from HBase", e);
+ }
+ }
+
+ @Override
+ public boolean hasPreferenceValues() {
+ return true;
+ }
+
+ @Override
+ public float getMaxPreference() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getMinPreference() {
+ throw new UnsupportedOperationException();
+ }
+
+ /* Closeable interface */
+
+ @Override
+ public void close() throws IOException {
+ pool.close();
+ }
+
+ /* Refreshable interface */
+
+ @Override
+ public void refresh(Collection<Refreshable> alreadyRefreshed) {
+ if (alreadyRefreshed == null || !alreadyRefreshed.contains(this)) {
+ try {
+ log.info("Refreshing item and user ID caches");
+ long t1 = System.currentTimeMillis();
+ refreshItemIDs();
+ refreshUserIDs();
+ long t2 = System.currentTimeMillis();
+ log.info("Finished refreshing caches in {} ms", t2 - t1);
+ } catch (IOException e) {
+ throw new IllegalStateException("Could not reload DataModel", e);
+ }
+ }
+ }
+
+ /*
+ * Refresh the item id cache. Warning: this does a large table scan
+ */
+ private synchronized void refreshItemIDs() throws IOException {
+ // Get the list of item ids
+ HTableInterface table = pool.getTable(tableName);
+ Scan scan = new Scan(new byte[]{0x69}, new byte[]{0x70});
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, new KeyOnlyFilter(), new FirstKeyOnlyFilter()));
+ ResultScanner scanner = table.getScanner(scan);
+ Collection<Long> ids = new LinkedList<Long>();
+ for (Result result : scanner) {
+ ids.add(bytesToUserOrItemID(result.getRow()));
+ }
+ table.close();
+
+ // Copy into FastIDSet
+ FastIDSet itemIDs = new FastIDSet(ids.size());
+ for (long l : ids) {
+ itemIDs.add(l);
+ }
+
+ // Swap with the active
+ this.itemIDs = itemIDs;
+ }
+
+ /*
+ * Refresh the user id cache. Warning: this does a large table scan
+ */
+ private synchronized void refreshUserIDs() throws IOException {
+ // Get the list of user ids
+ HTableInterface table = pool.getTable(tableName);
+ Scan scan = new Scan(new byte[]{0x75}, new byte[]{0x76});
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, new KeyOnlyFilter(), new FirstKeyOnlyFilter()));
+ ResultScanner scanner = table.getScanner(scan);
+ Collection<Long> ids = new LinkedList<Long>();
+ for (Result result : scanner) {
+ ids.add(bytesToUserOrItemID(result.getRow()));
+ }
+ table.close();
+
+ // Copy into FastIDSet
+ FastIDSet userIDs = new FastIDSet(ids.size());
+ for(long l : ids) {
+ userIDs.add(l);
+ }
+
+ // Swap with the active
+ this.userIDs = userIDs;
+ }
+
+}