You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tmalaska <gi...@git.apache.org> on 2014/07/27 05:20:56 UTC

[GitHub] spark pull request: Spark-2447 : Spark on HBase

GitHub user tmalaska opened a pull request:

    https://github.com/apache/spark/pull/1608

    Spark-2447 : Spark on HBase

    Add common solution for sending upsert actions to HBase (put, deletes,
    and increment)
    
    This is the first pull request: mainly to test the review process, but there are still a number of things that I plan to add this week.
    
    1. Clean up the pom file
    2. Add unit tests for the HConnectionStaticCache
    
    If I have time I will also add the following:
    1. Support for Java
    2. Additional unit tests for Java
    3. Additional unit tests for Spark Streaming

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tmalaska/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1608.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1608
    
----
commit 6d9c733d4f177292cfc2fda15a6059660bd500f3
Author: tmalaska <te...@cloudera.com>
Date:   2014-07-27T03:17:06Z

    Spark-2447 : Spark on HBase
    
    Add common solution for sending upsert actions to HBase (put, deletes,
    and increment)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50254637
  
    QA results for PR 1608:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>@serializable class HBaseContext(@transient sc: SparkContext,<br>protected class hconnectionCleanerTask extends TimerTask {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17237/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606416
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,140 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + ~ Licensed to the Apache Software Foundation (ASF) under one or more
    + ~ contributor license agreements. See the NOTICE file distributed with
    + ~ this work for additional information regarding copyright ownership.
    + ~ The ASF licenses this file to You under the Apache License, Version 2.0
    + ~ (the "License"); you may not use this file except in compliance with
    + ~ the License. You may obtain a copy of the License at
    + ~
    + ~  http://www.apache.org/licenses/LICENSE-2.0
    + ~
    + ~ Unless required by applicable law or agreed to in writing, software
    + ~ distributed under the License is distributed on an "AS IS" BASIS,
    + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + ~ See the License for the specific language governing permissions and
    + ~ limitations under the License.
    + -->
    + 
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    + <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +   <groupId>org.apache.spark</groupId>
    +   <artifactId>spark-parent</artifactId>
    +   <version>1.1.0-SNAPSHOT</version>
    +   <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <groupId>org.apache.spark</groupId>
    +  <artifactId>spark-hbase_2.10</artifactId>
    +  <properties>
    +    <sbt.project.name>spark-hbase</sbt.project.name>
    +  </properties>
    +  <packaging>jar</packaging>
    +  <name>Spark Project External Flume</name>
    +  <url>http://spark.apache.org/</url>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>org.scalatest</groupId>
    +      <artifactId>scalatest_${scala.binary.version}</artifactId>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-core_${scala.binary.version}</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +      <version>${project.version}</version>
    +      <type>test-jar</type>
    +      <classifier>tests</classifier>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hbase</groupId>
    +      <artifactId>hbase-client</artifactId>
    +      <version>${hbase-new.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hbase</groupId>
    +      <artifactId>hbase-client</artifactId>
    +      <version>${hbase-new.version}</version>
    +      <type>test-jar</type>
    +      <classifier>tests</classifier>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hbase</groupId>
    +      <artifactId>hbase-server</artifactId>
    +      <version>${hbase-new.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hbase</groupId>
    +      <artifactId>hbase-server</artifactId>
    +      <version>${hbase-new.version}</version>
    +      <type>test-jar</type>
    +      <classifier>tests</classifier> 
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hbase</groupId>
    +      <artifactId>hbase-hadoop1-compat</artifactId>
    --- End diff --
    
    Can we do hadoop2 instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606202
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,140 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + ~ Licensed to the Apache Software Foundation (ASF) under one or more
    + ~ contributor license agreements. See the NOTICE file distributed with
    + ~ this work for additional information regarding copyright ownership.
    + ~ The ASF licenses this file to You under the Apache License, Version 2.0
    + ~ (the "License"); you may not use this file except in compliance with
    + ~ the License. You may obtain a copy of the License at
    + ~
    + ~  http://www.apache.org/licenses/LICENSE-2.0
    + ~
    + ~ Unless required by applicable law or agreed to in writing, software
    + ~ distributed under the License is distributed on an "AS IS" BASIS,
    + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + ~ See the License for the specific language governing permissions and
    + ~ limitations under the License.
    + -->
    + 
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    + <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +   <groupId>org.apache.spark</groupId>
    +   <artifactId>spark-parent</artifactId>
    +   <version>1.1.0-SNAPSHOT</version>
    +   <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <groupId>org.apache.spark</groupId>
    +  <artifactId>spark-hbase_2.10</artifactId>
    +  <properties>
    +    <sbt.project.name>spark-hbase</sbt.project.name>
    +  </properties>
    +  <packaging>jar</packaging>
    +  <name>Spark Project External Flume</name>
    --- End diff --
    
    Is the mention of flume here intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438477
  
    --- Diff: external/hbase/src/test/scala/org/apache/spark/hbase/LocalSparkContext.scala ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
    +import org.scalatest.BeforeAndAfterAll
    +import org.scalatest.BeforeAndAfterEach
    +import org.scalatest.Suite
    +import org.apache.spark.SparkContext
    +
    +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
    +trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
    +
    +  @transient var sc: SparkContext = _
    +
    +  override def beforeAll() {
    +    InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
    --- End diff --
    
    Curious what this is about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606704
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    --- End diff --
    
    This paragraph needs an edit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16607208
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     Active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD  Original RDD with data to iterate over
    +   * @param f    Function to be given a iterator to iterate through
    +   *             the RDD values and a HConnection object to interact 
    +   *             with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream     Original DStream with data to iterate over
    +   * @param f           Function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    dstream.foreach((rdd, time) => {
    +      foreachPartition(rdd, f)
    +    })
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD mapPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param mp      Function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase
    +   * @return        Returns a new RDD generated by the user definition
    +   *                function just like normal mapPartition
    +   */
    +  def mapPartition[T, U: ClassTag](rdd: RDD[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = {
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming DStream
    +   * mapPartition.
    +   * 
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param mp         Function to be given a iterator to iterate through
    +   *                   the DStream values and a HConnection object to 
    +   *                   interact with HBase
    +   * @return           Returns a new DStream generated by the user 
    +   *                   definition function just like normal mapPartition
    +   */
    +  def streamMap[T, U: ClassTag](dstream: DStream[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {
    +
    +    dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take RDD 
    +   * and generate puts and send them to HBase.  
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def bulkPut[T](rdd: RDD[T], tableNm: String, f: (T) => Put, autoFlush: Boolean) {
    +
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          htable.setAutoFlush(autoFlush, true)
    +          iterator.foreach(T => htable.put(f(T)))
    +          htable.flushCommits()
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamMapPartition method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate puts and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def streamBulkPut[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Put,
    +    autoFlush: Boolean) = {
    +    dstream.foreach((rdd, time) => {
    +      bulkPut(rdd, tableNm, f, autoFlush)
    +    })
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and 
    +   * generate increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to increment to 
    +   * @param f          Function to convert a value in the RDD to a 
    +   *                   HBase Increments
    +   * @batchSize        The number of increments to batch before sending to HBase
    +   */
    +  def bulkIncrement[T](rdd: RDD[T], tableNm:String, f:(T) => Increment, batchSize: Int) {
    +    bulkMutation(rdd, tableNm, f, batchSize)
    +  }
    +  
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and generate delete
    +   * and send them to HBase.  The complexity of even the HConnection is 
    +   * removed from the developer
    +   *  
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param tableNm     The name of the table to delete from 
    +   * @param f       function to convert a value in the RDD to a 
    +   *                HBase Deletes
    +   * @batchSize     The number of delete to batch before sending to HBase
    +   */
    +  def bulkDelete[T](rdd: RDD[T], tableNm:String, f:(T) => Delete, batchSize: Int) {
    +    bulkMutation(rdd, tableNm, f, batchSize)
    +  }
    +  
    +  /** 
    +   *  Under lining function to support all bulk mutations
    +   *  
    +   *  May be opened up if requested
    +   */
    +  private def bulkMutation[T](rdd: RDD[T], tableNm: String, f: (T) => Mutation, batchSize: Int) {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          val mutationList = new ArrayList[Mutation]
    +          iterator.foreach(T => {
    +            mutationList.add(f(T))
    +            if (mutationList.size >= batchSize) {
    +              htable.batch(mutationList)
    +              mutationList.clear()
    +            }
    +          })
    +          if (mutationList.size() > 0) {
    +            htable.batch(mutationList)
    +            mutationList.clear()
    +          }
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamForeach method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate Increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to increments into 
    +   * @param f          function to convert a value in the RDD to a 
    +   *                   HBase Increments
    +   * @batchSize        The number of increments to batch before sending to HBase
    +   */
    +  def streamBulkIncrement[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Increment,
    +    batchSize: Int) = {
    +    streamBulkMutation(dstream, tableNm, f, batchSize)
    +  }
    +  
    +  /**
    +   * A simple abstraction over the HBaseContext.streamForeach method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate Delete and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream   Original DStream with data to iterate over
    +   * @param tableNm The name of the table to delete from 
    +   * @param f         function to convert a value in the RDD to a 
    +   *                  HBase Delete
    +   * @batchSize       The number of deletes to batch before sending to HBase
    +   */
    +  def streamBulkDelete[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Delete,
    +    batchSize: Int) = {
    +    streamBulkMutation(dstream, tableNm, f, batchSize)
    +  }
    +  
    +  /** 
    +   *  Under lining function to support all bulk streaming mutations
    +   *  
    +   *  May be opened up if requested
    +   */
    +  private def streamBulkMutation[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Mutation,
    +    batchSize: Int) = {
    +    dstream.foreach((rdd, time) => {
    +      bulkMutation(rdd, tableNm, f, batchSize)
    +    })
    +  }
    +
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.mapPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and generates a
    +   * new RDD based on Gets and the results they bring back from HBase
    --- End diff --
    
    I know what this about having read your nice design doc.  Should there be a few more notes in here on why this is needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50420751
  
    QA results for PR 1608:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>@serializable class HBaseContext(@transient sc: SparkContext,<br>protected class hconnectionCleanerTask extends TimerTask {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17316/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606710
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,140 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + ~ Licensed to the Apache Software Foundation (ASF) under one or more
    + ~ contributor license agreements. See the NOTICE file distributed with
    + ~ this work for additional information regarding copyright ownership.
    + ~ The ASF licenses this file to You under the Apache License, Version 2.0
    + ~ (the "License"); you may not use this file except in compliance with
    + ~ the License. You may obtain a copy of the License at
    + ~
    + ~  http://www.apache.org/licenses/LICENSE-2.0
    + ~
    + ~ Unless required by applicable law or agreed to in writing, software
    + ~ distributed under the License is distributed on an "AS IS" BASIS,
    + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + ~ See the License for the specific language governing permissions and
    + ~ limitations under the License.
    + -->
    + 
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    + <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +   <groupId>org.apache.spark</groupId>
    +   <artifactId>spark-parent</artifactId>
    +   <version>1.1.0-SNAPSHOT</version>
    +   <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <groupId>org.apache.spark</groupId>
    +  <artifactId>spark-hbase_2.10</artifactId>
    --- End diff --
    
    That's the Scala version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50541560
  
    QA tests have started for PR 1608. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17382/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438420
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>14.0.1</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-client</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>com.google.guava</groupId>
    +					<artifactId>guava</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<type>test-jar</type>
    +			<classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-core_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>org.eclipse.jetty.orbit</groupId>
    +					<artifactId>javax.servlet</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-client</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>io.netty</groupId>
    +					<artifactId>netty</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +
    +
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-common</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-common</artifactId>
    +			<version>0.98.1-hadoop2</version>
    --- End diff --
    
    These and the other HBase declarations should use a single version property to control them, and it's in the parent. Is this going to mean that it doesn't work with Hadoop 1.x?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438431
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,544 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.hadoop.hbase.mapreduce.MutationSerialization
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization
    +import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos
    +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc - active SparkContext
    + *  @param broadcastedConf - This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    --- End diff --
    
    The param name is `rdd` rather than `RDD[t]`. I don't think scaladoc / javadoc will parse that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15437185
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    --- End diff --
    
    Add Apache license headers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50417100
  
    QA tests have started for PR 1608. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17316/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16607538
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     Active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD  Original RDD with data to iterate over
    +   * @param f    Function to be given a iterator to iterate through
    +   *             the RDD values and a HConnection object to interact 
    +   *             with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream     Original DStream with data to iterate over
    +   * @param f           Function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    dstream.foreach((rdd, time) => {
    +      foreachPartition(rdd, f)
    +    })
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD mapPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param mp      Function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase
    +   * @return        Returns a new RDD generated by the user definition
    +   *                function just like normal mapPartition
    +   */
    +  def mapPartition[T, U: ClassTag](rdd: RDD[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = {
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming DStream
    +   * mapPartition.
    +   * 
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param mp         Function to be given a iterator to iterate through
    +   *                   the DStream values and a HConnection object to 
    +   *                   interact with HBase
    +   * @return           Returns a new DStream generated by the user 
    +   *                   definition function just like normal mapPartition
    +   */
    +  def streamMap[T, U: ClassTag](dstream: DStream[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {
    +
    +    dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take RDD 
    +   * and generate puts and send them to HBase.  
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def bulkPut[T](rdd: RDD[T], tableNm: String, f: (T) => Put, autoFlush: Boolean) {
    +
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          htable.setAutoFlush(autoFlush, true)
    +          iterator.foreach(T => htable.put(f(T)))
    +          htable.flushCommits()
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamMapPartition method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate puts and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def streamBulkPut[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Put,
    +    autoFlush: Boolean) = {
    +    dstream.foreach((rdd, time) => {
    +      bulkPut(rdd, tableNm, f, autoFlush)
    +    })
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and 
    +   * generate increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to increment to 
    +   * @param f          Function to convert a value in the RDD to a 
    +   *                   HBase Increments
    +   * @batchSize        The number of increments to batch before sending to HBase
    +   */
    +  def bulkIncrement[T](rdd: RDD[T], tableNm:String, f:(T) => Increment, batchSize: Int) {
    +    bulkMutation(rdd, tableNm, f, batchSize)
    +  }
    +  
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and generate delete
    +   * and send them to HBase.  The complexity of even the HConnection is 
    +   * removed from the developer
    +   *  
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param tableNm     The name of the table to delete from 
    +   * @param f       function to convert a value in the RDD to a 
    +   *                HBase Deletes
    +   * @batchSize     The number of delete to batch before sending to HBase
    +   */
    +  def bulkDelete[T](rdd: RDD[T], tableNm:String, f:(T) => Delete, batchSize: Int) {
    +    bulkMutation(rdd, tableNm, f, batchSize)
    +  }
    +  
    +  /** 
    +   *  Under lining function to support all bulk mutations
    +   *  
    +   *  May be opened up if requested
    +   */
    +  private def bulkMutation[T](rdd: RDD[T], tableNm: String, f: (T) => Mutation, batchSize: Int) {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          val mutationList = new ArrayList[Mutation]
    +          iterator.foreach(T => {
    +            mutationList.add(f(T))
    +            if (mutationList.size >= batchSize) {
    +              htable.batch(mutationList)
    +              mutationList.clear()
    +            }
    +          })
    +          if (mutationList.size() > 0) {
    +            htable.batch(mutationList)
    +            mutationList.clear()
    +          }
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamForeach method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate Increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to increments into 
    +   * @param f          function to convert a value in the RDD to a 
    +   *                   HBase Increments
    +   * @batchSize        The number of increments to batch before sending to HBase
    +   */
    +  def streamBulkIncrement[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Increment,
    +    batchSize: Int) = {
    +    streamBulkMutation(dstream, tableNm, f, batchSize)
    +  }
    +  
    +  /**
    +   * A simple abstraction over the HBaseContext.streamForeach method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate Delete and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream   Original DStream with data to iterate over
    +   * @param tableNm The name of the table to delete from 
    +   * @param f         function to convert a value in the RDD to a 
    +   *                  HBase Delete
    +   * @batchSize       The number of deletes to batch before sending to HBase
    +   */
    +  def streamBulkDelete[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Delete,
    +    batchSize: Int) = {
    +    streamBulkMutation(dstream, tableNm, f, batchSize)
    +  }
    +  
    +  /** 
    +   *  Under lining function to support all bulk streaming mutations
    +   *  
    +   *  May be opened up if requested
    +   */
    +  private def streamBulkMutation[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Mutation,
    +    batchSize: Int) = {
    +    dstream.foreach((rdd, time) => {
    +      bulkMutation(rdd, tableNm, f, batchSize)
    +    })
    +  }
    +
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.mapPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and generates a
    +   * new RDD based on Gets and the results they bring back from HBase
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to get from 
    +   * @param makeGet    Function to convert a value in the RDD to a 
    +   *                   HBase Get
    +   * @param convertResult This will convert the HBase Result object to 
    +   *                   what ever the user wants to put in the resulting 
    +   *                   RDD
    +   * return            New RDD that is created by the Get to HBase
    +   */
    +  def bulkGet[T, U: ClassTag](tableNm: String,
    +    batchSize: Int,
    +    rdd: RDD[T],
    +    makeGet: (T) => Get,
    +    convertResult: (Result) => U): RDD[U] = {
    +
    +    val getMapPartition = new GetMapPartition(tableNm,
    +      batchSize,
    +      makeGet,
    +      convertResult)
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      getMapPartition.run), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamMap method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generates a new DStream based on Gets and the results 
    +   * they bring back from HBase
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to get from 
    +   * @param makeGet    Function to convert a value in the DStream to a 
    +   *                   HBase Get
    +   * @param convertResult This will convert the HBase Result object to 
    +   *                   what ever the user wants to put in the resulting 
    +   *                   DStream
    +   * return            new DStream that is created by the Get to HBase    
    +   */
    +  def streamBulkGet[T, U: ClassTag](tableNm: String,
    +      batchSize:Int,
    +      dstream: DStream[T],
    +      makeGet: (T) => Get, 
    +      convertResult: (Result) => U): DStream[U] = {
    +
    +    val getMapPartition = new GetMapPartition(tableNm,
    +      batchSize,
    +      makeGet,
    +      convertResult)
    +
    +    dstream.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      getMapPartition.run), true)
    +  }
    +
    +  /**
    +   * This function will use the native HBase TableInputFormat with the 
    +   * given scan object to generate a new RDD
    +   * 
    +   *  @param tableNm The name of the table to scan
    +   *  @param scan      The HBase scan object to use to read data from HBase
    +   *  @param f         Function to convert a Result object from HBase into 
    +   *                   what the user wants in the final generated RDD
    +   *  @return          New RDD with results from scan 
    +   */
    +  def hbaseRDD[U: ClassTag](tableNm: String, scan: Scan, f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
    +
    +    var job: Job = new Job(broadcastedConf.value.value)
    +
    +    TableMapReduceUtil.initTableMapperJob(tableNm, scan, classOf[IdentityTableMapper], null, null, job)
    +
    +    sc.newAPIHadoopRDD(job.getConfiguration(),
    +      classOf[TableInputFormat],
    +      classOf[ImmutableBytesWritable],
    +      classOf[Result]).map(f)
    +  }
    +
    +  /**
    +   * A overloaded version of HBaseContext hbaseRDD that predefines the 
    +   * type of the outputing RDD
    +   * 
    +   *  @param tableNm The name of the table to scan
    +   *  @param scan      The HBase scan object to use to read data from HBase
    +   *  @return          New RDD with results from scan 
    +   * 
    +   */
    +  def hbaseRDD(tableNm: String, scans: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {
    +    hbaseRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])](
    +      tableNm,
    +      scans,
    +      (r: (ImmutableBytesWritable, Result)) => {
    +        val it = r._2.list().iterator()
    +        val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]()
    +
    +        while (it.hasNext()) {
    +          val kv = it.next()
    +          list.add((kv.getFamily(), kv.getQualifier(), kv.getValue()))
    +        }
    +
    +        (r._1.copyBytes(), list)
    +      })
    +  }
    +  
    +  /** 
    +   *  Under lining wrapper all foreach functions in HBaseContext
    +   *  
    +   */
    +  private def hbaseForeachPartition[T](configBroadcast: Broadcast[SerializableWritable[Configuration]],
    +    it: Iterator[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +
    +    val config = configBroadcast.value.value
    +
    +    val hConnection = HConnectionStaticCache.getHConnection(config)
    +    try {
    +      f(it, hConnection)
    +    } finally {
    +      HConnectionStaticCache.finishWithHConnection(config, hConnection)
    +    }
    +  }
    +
    +  /** 
    +   *  Under lining wrapper all mapPartition functions in HBaseContext
    +   *  
    +   */
    +  private def hbaseMapPartition[K, U](configBroadcast: Broadcast[SerializableWritable[Configuration]],
    +    it: Iterator[K],
    +    mp: (Iterator[K], HConnection) => Iterator[U]): Iterator[U] = {
    +
    +    val config = configBroadcast.value.value
    +
    +    val hConnection = HConnectionStaticCache.getHConnection(config)
    +
    +    try {
    +      val res = mp(it, hConnection)
    +      res
    +    } finally {
    +      HConnectionStaticCache.finishWithHConnection(config, hConnection)
    +    }
    +  }
    +  
    +  /** 
    +   *  Under lining wrapper all get mapPartition functions in HBaseContext
    +   *  
    +   */
    +  @serializable private  class GetMapPartition[T, U: ClassTag](tableNm: String, 
    --- End diff --
    
    Again, pardon my ignorance, but out of interest, how is this @serializable implemented in scala-land?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15437173
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>14.0.1</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-client</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>com.google.guava</groupId>
    +					<artifactId>guava</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<type>test-jar</type>
    +			<classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-core_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>org.eclipse.jetty.orbit</groupId>
    +					<artifactId>javax.servlet</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-client</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>io.netty</groupId>
    +					<artifactId>netty</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +
    +
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-common</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-common</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-server</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-server</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>javax.servlet</groupId>
    +			<artifactId>javax.servlet-api</artifactId>
    +			<version>3.1.0</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-hadoop-compat</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-hadoop-compat</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +			<classifier>tests</classifier>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-hadoop2-compat</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-hadoop2-compat</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +			<classifier>tests</classifier>
    +		</dependency>
    +	</dependencies>
    +	<build>
    +	    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
    +	    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
    +	    <plugins>
    +	      <plugin>
    +	        <groupId>org.scalatest</groupId>
    +	        <artifactId>scalatest-maven-plugin</artifactId>
    +	      </plugin>
    +	    </plugins>
    +        </build>
    +
    +</project>
    --- End diff --
    
    Add a blank line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606905
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     Active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD  Original RDD with data to iterate over
    +   * @param f    Function to be given a iterator to iterate through
    +   *             the RDD values and a HConnection object to interact 
    +   *             with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream     Original DStream with data to iterate over
    +   * @param f           Function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    dstream.foreach((rdd, time) => {
    +      foreachPartition(rdd, f)
    +    })
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD mapPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param mp      Function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase
    +   * @return        Returns a new RDD generated by the user definition
    +   *                function just like normal mapPartition
    +   */
    +  def mapPartition[T, U: ClassTag](rdd: RDD[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = {
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming DStream
    +   * mapPartition.
    +   * 
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param mp         Function to be given a iterator to iterate through
    +   *                   the DStream values and a HConnection object to 
    +   *                   interact with HBase
    +   * @return           Returns a new DStream generated by the user 
    +   *                   definition function just like normal mapPartition
    +   */
    +  def streamMap[T, U: ClassTag](dstream: DStream[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {
    +
    +    dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take RDD 
    +   * and generate puts and send them to HBase.  
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def bulkPut[T](rdd: RDD[T], tableNm: String, f: (T) => Put, autoFlush: Boolean) {
    +
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          htable.setAutoFlush(autoFlush, true)
    +          iterator.foreach(T => htable.put(f(T)))
    +          htable.flushCommits()
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamMapPartition method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate puts and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    --- End diff --
    
    I wonder if users will care about auto flush enough that it should show up in the api?  Rather, for those who care, could then not set it in the passed in configuration rather than on a per method basis?  Just a thought if you are looking to simplify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16607037
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     Active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD  Original RDD with data to iterate over
    +   * @param f    Function to be given a iterator to iterate through
    +   *             the RDD values and a HConnection object to interact 
    +   *             with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream     Original DStream with data to iterate over
    +   * @param f           Function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    dstream.foreach((rdd, time) => {
    +      foreachPartition(rdd, f)
    +    })
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD mapPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param mp      Function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase
    +   * @return        Returns a new RDD generated by the user definition
    +   *                function just like normal mapPartition
    +   */
    +  def mapPartition[T, U: ClassTag](rdd: RDD[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = {
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming DStream
    +   * mapPartition.
    +   * 
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param mp         Function to be given a iterator to iterate through
    +   *                   the DStream values and a HConnection object to 
    +   *                   interact with HBase
    +   * @return           Returns a new DStream generated by the user 
    +   *                   definition function just like normal mapPartition
    +   */
    +  def streamMap[T, U: ClassTag](dstream: DStream[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {
    +
    +    dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take RDD 
    +   * and generate puts and send them to HBase.  
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def bulkPut[T](rdd: RDD[T], tableNm: String, f: (T) => Put, autoFlush: Boolean) {
    +
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          htable.setAutoFlush(autoFlush, true)
    +          iterator.foreach(T => htable.put(f(T)))
    +          htable.flushCommits()
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamMapPartition method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate puts and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to put into 
    +   * @param f          Function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        If autoFlush should be turned on
    +   */
    +  def streamBulkPut[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Put,
    +    autoFlush: Boolean) = {
    +    dstream.foreach((rdd, time) => {
    +      bulkPut(rdd, tableNm, f, autoFlush)
    +    })
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and 
    +   * generate increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD        Original RDD with data to iterate over
    +   * @param tableNm  The name of the table to increment to 
    +   * @param f          Function to convert a value in the RDD to a 
    +   *                   HBase Increments
    +   * @batchSize        The number of increments to batch before sending to HBase
    +   */
    +  def bulkIncrement[T](rdd: RDD[T], tableNm:String, f:(T) => Increment, batchSize: Int) {
    +    bulkMutation(rdd, tableNm, f, batchSize)
    +  }
    +  
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and generate delete
    +   * and send them to HBase.  The complexity of even the HConnection is 
    +   * removed from the developer
    +   *  
    +   * @param RDD     Original RDD with data to iterate over
    +   * @param tableNm     The name of the table to delete from 
    +   * @param f       function to convert a value in the RDD to a 
    +   *                HBase Deletes
    +   * @batchSize     The number of delete to batch before sending to HBase
    +   */
    +  def bulkDelete[T](rdd: RDD[T], tableNm:String, f:(T) => Delete, batchSize: Int) {
    +    bulkMutation(rdd, tableNm, f, batchSize)
    +  }
    +  
    +  /** 
    +   *  Under lining function to support all bulk mutations
    +   *  
    +   *  May be opened up if requested
    +   */
    +  private def bulkMutation[T](rdd: RDD[T], tableNm: String, f: (T) => Mutation, batchSize: Int) {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableNm)
    +          val mutationList = new ArrayList[Mutation]
    +          iterator.foreach(T => {
    +            mutationList.add(f(T))
    +            if (mutationList.size >= batchSize) {
    +              htable.batch(mutationList)
    +              mutationList.clear()
    +            }
    +          })
    +          if (mutationList.size() > 0) {
    +            htable.batch(mutationList)
    +            mutationList.clear()
    +          }
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamForeach method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate Increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream    Original DStream with data to iterate over
    +   * @param tableNm  The name of the table to increments into 
    +   * @param f          function to convert a value in the RDD to a 
    +   *                   HBase Increments
    +   * @batchSize        The number of increments to batch before sending to HBase
    +   */
    +  def streamBulkIncrement[T](dstream: DStream[T],
    +    tableNm: String,
    +    f: (T) => Increment,
    +    batchSize: Int) = {
    +    streamBulkMutation(dstream, tableNm, f, batchSize)
    +  }
    +  
    +  /**
    +   * A simple abstraction over the HBaseContext.streamForeach method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    --- End diff --
    
    'additional'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606346
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,140 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + ~ Licensed to the Apache Software Foundation (ASF) under one or more
    + ~ contributor license agreements. See the NOTICE file distributed with
    + ~ this work for additional information regarding copyright ownership.
    + ~ The ASF licenses this file to You under the Apache License, Version 2.0
    + ~ (the "License"); you may not use this file except in compliance with
    + ~ the License. You may obtain a copy of the License at
    + ~
    + ~  http://www.apache.org/licenses/LICENSE-2.0
    + ~
    + ~ Unless required by applicable law or agreed to in writing, software
    + ~ distributed under the License is distributed on an "AS IS" BASIS,
    + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + ~ See the License for the specific language governing permissions and
    + ~ limitations under the License.
    + -->
    + 
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    + <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +   <groupId>org.apache.spark</groupId>
    +   <artifactId>spark-parent</artifactId>
    +   <version>1.1.0-SNAPSHOT</version>
    +   <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <groupId>org.apache.spark</groupId>
    +  <artifactId>spark-hbase_2.10</artifactId>
    +  <properties>
    +    <sbt.project.name>spark-hbase</sbt.project.name>
    +  </properties>
    +  <packaging>jar</packaging>
    +  <name>Spark Project External Flume</name>
    +  <url>http://spark.apache.org/</url>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>org.scalatest</groupId>
    +      <artifactId>scalatest_${scala.binary.version}</artifactId>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-core_${scala.binary.version}</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +      <version>${project.version}</version>
    +      <type>test-jar</type>
    +      <classifier>tests</classifier>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hbase</groupId>
    +      <artifactId>hbase-client</artifactId>
    +      <version>${hbase-new.version}</version>
    --- End diff --
    
    What does the '-new' refer to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50915104
  
    QA results for PR 1608:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>@serializable class HBaseContext(@transient sc: SparkContext,<br>protected class hconnectionCleanerTask extends TimerTask {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17682/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50693942
  
    Just FYI - there is already an outstanding patch and JIRA for HBase support on Spark:
    https://github.com/apache/spark/pull/194
    https://issues.apache.org/jira/browse/SPARK-1127


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16606170
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,140 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + ~ Licensed to the Apache Software Foundation (ASF) under one or more
    + ~ contributor license agreements. See the NOTICE file distributed with
    + ~ this work for additional information regarding copyright ownership.
    + ~ The ASF licenses this file to You under the Apache License, Version 2.0
    + ~ (the "License"); you may not use this file except in compliance with
    + ~ the License. You may obtain a copy of the License at
    + ~
    + ~  http://www.apache.org/licenses/LICENSE-2.0
    + ~
    + ~ Unless required by applicable law or agreed to in writing, software
    + ~ distributed under the License is distributed on an "AS IS" BASIS,
    + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + ~ See the License for the specific language governing permissions and
    + ~ limitations under the License.
    + -->
    + 
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    + <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +   <groupId>org.apache.spark</groupId>
    +   <artifactId>spark-parent</artifactId>
    +   <version>1.1.0-SNAPSHOT</version>
    +   <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <groupId>org.apache.spark</groupId>
    +  <artifactId>spark-hbase_2.10</artifactId>
    --- End diff --
    
    Pardon my ignorance, but what is the _2.10 about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438418
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>14.0.1</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-client</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>com.google.guava</groupId>
    +					<artifactId>guava</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<type>test-jar</type>
    +			<classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-core_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    --- End diff --
    
    ${project.version}, not hard-coded to 1.0.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438411
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>14.0.1</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-client</artifactId>
    +			<version>2.3.0</version>
    --- End diff --
    
    You shouldn't specify versions in the children poms. In fact, you can't, since this one in particular has to be overridden by `hadoop.version`. You can remove all `<version>` in this file. In fact there is already an `hbase.version` defined in the parent, which affects the code examples. You may also have to harmonize the examples with the newer HBase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50513421
  
    Does HBase have some sort of schema information? If yes, maybe we can add it as a data source in SchemaRDD?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438469
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseStreamingBulkPutExample.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase.example
    +
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.spark.hbase.HBaseContext
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.streaming.Seconds
    +import org.apache.spark.SparkConf
    +
    +object HBaseStreamingBulkPutExample {
    +  def main(args: Array[String]) {
    +    if (args.length == 0) {
    +        System.out.println("HBaseStreamingBulkPutExample {master} {host} {port} {tableName} {columnFamily}");
    +        return;
    +      }
    +      
    +      val master = args(0);
    +      val host = args(1);
    +      val port = args(2);
    +      val tableName = args(3);
    +      val columnFamily = args(4);
    +      
    +      System.out.println("master:" + master)
    +      System.out.println("host:" + host)
    +      System.out.println("port:" + Integer.parseInt(port))
    --- End diff --
    
    Another example: in Scala this is just `port.toInt`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15437166
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    --- End diff --
    
    Two spaces for indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438438
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,544 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.hadoop.hbase.mapreduce.MutationSerialization
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization
    +import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos
    +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc - active SparkContext
    + *  @param broadcastedConf - This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @param f       function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream[t]  Original DStream with data to iterate over
    +   * @param f           function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    dstream.foreach((rdd, time) => {
    +      foreachPartition(rdd, f)
    +    })
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD mapPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @param mp      function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase
    +   * @return        Returns a new RDD generated by the user definition
    +   *                function just like normal mapPartition
    +   */
    +  def mapPartition[T, U: ClassTag](rdd: RDD[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = {
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming DStream
    +   * mapPartition.
    +   * 
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param DStream[t] Original DStream with data to iterate over
    +   * @param mp         function to be given a iterator to iterate through
    +   *                   the DStream values and a HConnection object to 
    +   *                   interact with HBase
    +   * @return           Returns a new DStream generated by the user 
    +   *                   definition function just like normal mapPartition
    +   */
    +  def streamMap[T, U: ClassTag](dstream: DStream[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {
    +
    +    dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take RDD 
    +   * and generate puts and send them to HBase.  
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @tableName     The name of the table to put into 
    --- End diff --
    
    This has to be `@param tableName` right? or else I sure wasn't aware scaladoc supported that syntax


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15557786
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    --- End diff --
    
    Is there a need for this constructor? Better to do the following.
    
    ```
    class HBaseContext(@transient sc: Sparkcontext, @transient config: Configuration) extends Serializable {
        val broadcastConf = sc.broadcast(new SerializableWritable(config))
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16607705
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,140 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + ~ Licensed to the Apache Software Foundation (ASF) under one or more
    + ~ contributor license agreements. See the NOTICE file distributed with
    + ~ this work for additional information regarding copyright ownership.
    + ~ The ASF licenses this file to You under the Apache License, Version 2.0
    + ~ (the "License"); you may not use this file except in compliance with
    + ~ the License. You may obtain a copy of the License at
    + ~
    + ~  http://www.apache.org/licenses/LICENSE-2.0
    + ~
    + ~ Unless required by applicable law or agreed to in writing, software
    + ~ distributed under the License is distributed on an "AS IS" BASIS,
    + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + ~ See the License for the specific language governing permissions and
    + ~ limitations under the License.
    + -->
    + 
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    + <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +   <groupId>org.apache.spark</groupId>
    +   <artifactId>spark-parent</artifactId>
    +   <version>1.1.0-SNAPSHOT</version>
    +   <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <groupId>org.apache.spark</groupId>
    +  <artifactId>spark-hbase_2.10</artifactId>
    --- End diff --
    
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15555354
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,544 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    --- End diff --
    
    And also group them together like 
    `import java.io.{ByteArrayOutputStream, ByteArrayOutputStream, DataOutputStream}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50254635
  
    QA tests have started for PR 1608. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17237/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50910326
  
    QA tests have started for PR 1608. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17682/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438442
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,544 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.hadoop.hbase.mapreduce.MutationSerialization
    +import org.apache.hadoop.hbase.mapreduce.ResultSerialization
    +import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos
    +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc - active SparkContext
    + *  @param broadcastedConf - This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @param f       function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream[t]  Original DStream with data to iterate over
    +   * @param f           function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    dstream.foreach((rdd, time) => {
    +      foreachPartition(rdd, f)
    +    })
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD mapPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @param mp      function to be given a iterator to iterate through
    +   *                the RDD values and a HConnection object to interact 
    +   *                with HBase
    +   * @return        Returns a new RDD generated by the user definition
    +   *                function just like normal mapPartition
    +   */
    +  def mapPartition[T, U: ClassTag](rdd: RDD[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): RDD[U] = {
    +
    +    rdd.mapPartitions[U](it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming DStream
    +   * mapPartition.
    +   * 
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * Note: Make sure to partition correctly to avoid memory issue when
    +   *       getting data from HBase
    +   * 
    +   * @param DStream[t] Original DStream with data to iterate over
    +   * @param mp         function to be given a iterator to iterate through
    +   *                   the DStream values and a HConnection object to 
    +   *                   interact with HBase
    +   * @return           Returns a new DStream generated by the user 
    +   *                   definition function just like normal mapPartition
    +   */
    +  def streamMap[T, U: ClassTag](dstream: DStream[T],
    +    mp: (Iterator[T], HConnection) => Iterator[U]): DStream[U] = {
    +
    +    dstream.mapPartitions(it => hbaseMapPartition[T, U](broadcastedConf,
    +      it,
    +      mp), true)
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take RDD 
    +   * and generate puts and send them to HBase.  
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @tableName     The name of the table to put into 
    +   * @param f       function to convert a value in the RDD to a HBase Put
    +   * @autoFlush     if autoFlush should be turned on
    +   */
    +  def bulkPut[T](rdd: RDD[T], tableName: String, f: (T) => Put, autoFlush: Boolean) {
    +
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition[T](
    +        broadcastedConf,
    +        it,
    +        (iterator, hConnection) => {
    +          val htable = hConnection.getTable(tableName)
    +          htable.setAutoFlush(autoFlush, true)
    +          iterator.foreach(T => htable.put(f(T)))
    +          htable.flushCommits()
    +          htable.close()
    +        }))
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.streamMapPartition method.
    +   * 
    +   * It allow addition support for a user to take a DStream and 
    +   * generate puts and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param DStream[t] Original DStream with data to iterate over
    +   * @tableName        The name of the table to put into 
    +   * @param f          function to convert a value in the RDD to a HBase Put
    +   * @autoFlush        if autoFlush should be turned on
    +   */
    +  def streamBulkPut[T](dstream: DStream[T],
    +    tableName: String,
    +    f: (T) => Put,
    +    autoFlush: Boolean) = {
    +    dstream.foreach((rdd, time) => {
    +      bulkPut(rdd, tableName, f, autoFlush)
    +    })
    +  }
    +
    +  /**
    +   * A simple abstraction over the HBaseContext.foreachPartition method.
    +   * 
    +   * It allow addition support for a user to take a RDD and 
    +   * generate increments and send them to HBase.  
    +   * 
    +   * The complexity of even the HConnection is 
    +   * removed from the developer
    +   * 
    +   * @param RDD[t]  Original RDD with data to iterate over
    +   * @tableName     The name of the table to increment to 
    +   * @param f       function to convert a value in the RDD to a 
    +   *                HBase Increments
    +   * @batchSize     The number of increments to batch before sending to HBase
    +   */
    +  def bulkIncrement[T](rdd: RDD[T], tableName:String, f:(T) => Increment, batchSize: Integer) {
    --- End diff --
    
    batchSize is declared as a `java.lang.Integer` instead of a `scala.Int`. I bet it all kind of works but would `Int` not be more standard?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15563443
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     Active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param RDD  Original RDD with data to iterate over
    +   * @param f    Function to be given a iterator to iterate through
    +   *             the RDD values and a HConnection object to interact 
    +   *             with HBase 
    +   */
    +  def foreachPartition[T](rdd: RDD[T],
    +    f: (Iterator[T], HConnection) => Unit) = {
    +    rdd.foreachPartition(
    +      it => hbaseForeachPartition(broadcastedConf, it, f))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark Streaming dStream foreach
    +   * This function differs from the original in that it offers the 
    +   * developer access to a already connected HConnection object
    +   * 
    +   * Note: Do not close the HConnection object.  All HConnection
    +   * management is handled outside this method
    +   * 
    +   * @param DStream     Original DStream with data to iterate over
    +   * @param f           Function to be given a iterator to iterate through
    +   *                    the DStream values and a HConnection object to 
    +   *                    interact with HBase 
    +   */
    +  def streamForeach[T](dstream: DStream[T],
    --- End diff --
    
    This should be foreachRDD(). DStream.foreach() is deprecated in favor of DStream.foreachRDD().



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438453
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import java.util.HashMap
    +import org.apache.hadoop.hbase.client.HConnection
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.HConstants
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import java.util.TimerTask
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.Logging
    +
    +/**
    + * A static caching class that will manage all HConnection in a worker
    + * 
    + * The main idea is there is a hashMap with 
    + * HConstants.HBASE_CLIENT_INSTANCE_ID which is ("hbase.client.instance.id")
    + * 
    + * In that HashMap there is three things
    + *   - HConnection
    + *   - Number of checked out users of the HConnection
    + *   - Time since the HConnection was last used
    + *   
    + * There is also a Timer thread that will start up every 2 minutes
    + * When the Timer thread starts up it will look for HConnection with no
    + * checked out users and a last used time that is older then 1 minute.
    + * 
    + * This class is not intended to be used by Users
    + */
    +object HConnectionStaticCache extends Logging{
    +  @transient private val hconnectionMap = 
    +    new HashMap[String, (HConnection, AtomicInteger, AtomicLong)]
    +
    +  @transient private val hconnectionTimeout = 60000
    +
    +  @transient private val hconnectionCleaner = new Timer
    +
    +  hconnectionCleaner.schedule(new hconnectionCleanerTask, hconnectionTimeout * 2)
    +
    +  /**
    +   * Gets or starts a HConnection based on a config object
    +   */
    +  def getHConnection(config: Configuration): HConnection = {
    +    val instanceId = config.get(HConstants.HBASE_CLIENT_INSTANCE_ID)
    +    var hconnectionAndCounter = hconnectionMap.get(instanceId)
    --- End diff --
    
    `hconnectionMap` is accessed without synchronization here, when it may be being mutated. This could fail. Is this trying to implement double-checked locking? How about just a synchronized Map implementation and a call to `getOrElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50546377
  
    QA results for PR 1608:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>@serializable class HBaseContext(@transient sc: SparkContext,<br>protected class hconnectionCleanerTask extends TimerTask {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17382/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50919337
  
    QA results for PR 1608:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>@serializable class HBaseContext(@transient sc: SparkContext,<br>protected class hconnectionCleanerTask extends TimerTask {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17684/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1608#issuecomment-50914144
  
    QA tests have started for PR 1608. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17684/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by tmalaska <gi...@git.apache.org>.
Github user tmalaska closed the pull request at:

    https://github.com/apache/spark/pull/1608


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438461
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import java.util.HashMap
    +import org.apache.hadoop.hbase.client.HConnection
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.HConstants
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import java.util.TimerTask
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.Logging
    +
    +/**
    + * A static caching class that will manage all HConnection in a worker
    + * 
    + * The main idea is there is a hashMap with 
    + * HConstants.HBASE_CLIENT_INSTANCE_ID which is ("hbase.client.instance.id")
    + * 
    + * In that HashMap there is three things
    + *   - HConnection
    + *   - Number of checked out users of the HConnection
    + *   - Time since the HConnection was last used
    + *   
    + * There is also a Timer thread that will start up every 2 minutes
    + * When the Timer thread starts up it will look for HConnection with no
    + * checked out users and a last used time that is older then 1 minute.
    + * 
    + * This class is not intended to be used by Users
    + */
    +object HConnectionStaticCache extends Logging{
    +  @transient private val hconnectionMap = 
    +    new HashMap[String, (HConnection, AtomicInteger, AtomicLong)]
    --- End diff --
    
    This is using a lot of the Java collection API instead of Scala API. I think the latter would be more standard. In fact, it would help in a number of cases, like the one in the next comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16607576
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.client.HConnection
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.HConstants
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import java.util.TimerTask
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.Logging
    +import scala.collection.mutable.SynchronizedMap
    +import scala.collection.mutable.HashMap
    +
    +/**
    + * A static caching class that will manage all HConnection in a worker
    --- End diff --
    
    When you say 'caching', is it caching data fetched from HBase or caching HConnections?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15563085
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,538 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.spark.rdd.RDD
    +import java.io.ByteArrayOutputStream
    +import java.io.DataOutputStream
    +import java.io.ByteArrayInputStream
    +import java.io.DataInputStream
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import org.apache.spark.api.java.JavaPairRDD
    +import java.io.OutputStream
    +import org.apache.hadoop.hbase.client.HTable
    +import org.apache.hadoop.hbase.client.Scan
    +import org.apache.hadoop.hbase.client.Get
    +import java.util.ArrayList
    +import org.apache.hadoop.hbase.client.Result
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.hbase.client.HConnection
    +import org.apache.hadoop.hbase.client.Put
    +import org.apache.hadoop.hbase.client.Increment
    +import org.apache.hadoop.hbase.client.Delete
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.hadoop.mapreduce.Job
    +import org.apache.hadoop.hbase.mapreduce.TableMapper
    +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
    +import org.apache.hadoop.hbase.util.Base64
    +import org.apache.spark.rdd.HadoopRDD
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.SerializableWritable
    +import java.util.HashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +import org.apache.hadoop.hbase.HConstants
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import java.util.TimerTask
    +import org.apache.hadoop.hbase.client.Mutation
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * HBaseContext is a façade of simple and complex HBase operations
    + * like bulk put, get, increment, delete, and scan
    + *
    + * HBase Context will take the responsibilities to happen to
    + * complexity of disseminating the configuration information
    + * to the working and managing the life cycle of HConnections.
    + *
    + * First constructor:
    + *  @param sc              Active SparkContext
    + *  @param broadcastedConf This is a Broadcast object that holds a
    + * serializable Configuration object
    + *
    + */
    +@serializable class HBaseContext(@transient sc: SparkContext,
    +  broadcastedConf: Broadcast[SerializableWritable[Configuration]]) {
    +
    +  /**
    +   * Second constructor option:
    +   *  @param sc     Active SparkContext
    +   *  @param config Configuration object to make connection to HBase
    +   */
    +  def this(@transient sc: SparkContext, @transient config: Configuration) {
    +    this(sc, sc.broadcast(new SerializableWritable(config)))
    +  }
    +
    +  /**
    +   * A simple enrichment of the traditional Spark RDD foreachPartition.
    --- End diff --
    
    Please take a look at other the scala docs of RDD transformation to understand the style. For example, a better way to explain this method would be to do the following. 
    `Applies a function on each partition of the given RDD using a HBase connection`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438415
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>14.0.1</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-client</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    --- End diff --
    
    For the same reason, don't add exclusions here. They should be in the parent. There should only be exclusions if there is a specific reason that this components needs different dependencies only in this module. The parent already gets hadoop exclusions correct over a range of versions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16608329
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HConnectionStaticCache.scala ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.client.HConnection
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.atomic.AtomicLong
    +import java.util.Timer
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.hbase.HConstants
    +import org.apache.hadoop.hbase.client.HConnectionManager
    +import java.util.TimerTask
    +import scala.collection.mutable.MutableList
    +import org.apache.spark.Logging
    +import scala.collection.mutable.SynchronizedMap
    +import scala.collection.mutable.HashMap
    +
    +/**
    + * A static caching class that will manage all HConnection in a worker
    + *
    + * The main idea is there is a hashMap with
    + * HConstants.HBASE_CLIENT_INSTANCE_ID which is ("hbase.client.instance.id")
    + *
    + * In that HashMap there is three things
    + *   - HConnection
    + *   - Number of checked out users of the HConnection
    + *   - Time since the HConnection was last used
    + *
    + * There is also a Timer thread that will start up every 2 minutes
    + * When the Timer thread starts up it will look for HConnection with no
    + * checked out users and a last used time that is older then 1 minute.
    + *
    + * This class is not intended to be used by Users
    + */
    +object HConnectionStaticCache extends Logging {
    +  @transient private val hconnectionMap =
    +    new HashMap[String, (HConnection, AtomicInteger, AtomicLong)] with SynchronizedMap[String, (HConnection, AtomicInteger, AtomicLong)]
    +
    +  @transient private val hconnectionTimeout = 60000
    +
    +  @transient private val hconnectionCleaner = new Timer
    +
    +  hconnectionCleaner.schedule(new hconnectionCleanerTask, hconnectionTimeout * 2)
    +
    +  /**
    +   * Gets or starts a HConnection based on a config object
    +   */
    +  def getHConnection(config: Configuration): HConnection = {
    +    val instanceId = config.get(HConstants.HBASE_CLIENT_INSTANCE_ID)
    +    var hconnectionAndCounter = hconnectionMap.get(instanceId).getOrElse(null)
    +    if (hconnectionAndCounter == null) {
    +      hconnectionMap.synchronized { 
    +        hconnectionAndCounter = hconnectionMap.get(instanceId).getOrElse(null)
    +        if (hconnectionAndCounter == null) {
    +
    +          val hConnection = HConnectionManager.createConnection(config)
    +          hconnectionAndCounter = (hConnection, new AtomicInteger, new AtomicLong)
    +          hconnectionMap.put(instanceId, hconnectionAndCounter)
    +
    +        }
    +      }
    +      logDebug("Created hConnection '" + instanceId + "'");
    +    } else {
    +      logDebug("Get hConnection from cache '" + instanceId + "'");
    +    }
    +    hconnectionAndCounter._2.incrementAndGet()
    +    return hconnectionAndCounter._1
    +  }
    +
    +  /**
    +   * tell us a thread is no longer using a HConnection
    +   */
    +  def finishWithHConnection(config: Configuration, hconnection: HConnection) {
    +    val instanceId = config.get(HConstants.HBASE_CLIENT_INSTANCE_ID)
    +
    +    var hconnectionAndCounter = hconnectionMap.get(instanceId).getOrElse(null)
    +    if (hconnectionAndCounter != null) {
    +      var usesLeft = hconnectionAndCounter._2.decrementAndGet()
    +      if (usesLeft < 0) {
    +        hconnectionAndCounter._2.set(0)
    +        usesLeft = 0
    +      }
    +      if (usesLeft == 0) {
    +        hconnectionAndCounter._3.set(System.currentTimeMillis())
    +        logDebug("Finished last use of hconnection '" + instanceId + "'");
    +      } else {
    +        logDebug("Finished a use of hconnection '" + instanceId + "' with " + usesLeft + " uses left");
    +      }
    +    } else {
    +      logWarning("Tried to remove use of '" + instanceId + "' but nothing was there");
    +    }
    +  }
    +
    +  /**
    +   * The timer thread that cleans up the HashMap of Collections
    +   */
    +  protected class hconnectionCleanerTask extends TimerTask {
    +    override def run() {
    +
    +      logDebug("Running hconnectionCleanerTask:" + hconnectionMap.size);
    +
    +      val removeList = new MutableList[String]
    +
    +      hconnectionMap.foreach(entry => {
    +        if (entry._1 == 0 &&
    +          entry._2._3.get() + 60000 < System.currentTimeMillis()) {
    +          removeList.+=(entry._1)
    +        }
    +      })
    +
    +      if (removeList.length > 0) {
    +        hconnectionMap.synchronized {
    +          removeList.foreach(key => {
    +            val v = hconnectionMap.get(key).getOrElse(null)
    +            if (v != null) {
    +              if (v._2.get() == 0 &&
    +                v._3.get() + 60000 < System.currentTimeMillis()) {
    +  
    +                logDebug("closing hconnection: " + key);
    +  
    +                v._1.close()
    +  
    +                hconnectionMap.remove(key);
    +              }
    +            } else {
    +              logWarning("Tried to remove use of '" + key + "' but nothing was there");
    +            }
    +          })
    +        }
    +      }
    +    }
    +  }
    +
    +}
    --- End diff --
    
    You don't seem to have unit test for this class. Suggest you add one to ensure basic ref counting doing its job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438472
  
    --- Diff: external/hbase/src/test/scala/org/apache/spark/hbase/HBaseContextSuite.scala ---
    @@ -0,0 +1,296 @@
    +package org.apache.spark.hbase
    --- End diff --
    
    Missed a copyright header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by saintstack <gi...@git.apache.org>.
Github user saintstack commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r16608036
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/example/HBaseBulkDeleteExample.scala ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase.example
    +
    +import org.apache.spark.SparkContext
    +import org.apache.hadoop.hbase.HBaseConfiguration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.hbase.util.Bytes
    +import org.apache.spark.hbase.HBaseContext
    +import org.apache.hadoop.hbase.client.Delete
    +
    +object HBaseBulkDeleteExample {
    --- End diff --
    
    Class comment to state what this example does?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438429
  
    --- Diff: external/hbase/src/main/scala/org/apache/spark/hbase/HBaseContext.scala ---
    @@ -0,0 +1,544 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.hbase
    +
    +import org.apache.hadoop.hbase.HBaseConfiguration
    --- End diff --
    
    I'd sort and separate these imports like you can see in other source files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Spark-2447 : Spark on HBase

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1608#discussion_r15438426
  
    --- Diff: external/hbase/pom.xml ---
    @@ -0,0 +1,217 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +      <modelVersion>4.0.0</modelVersion>
    +        <parent>
    +          <groupId>org.apache.spark</groupId>
    +          <artifactId>spark-parent</artifactId>
    +          <version>1.1.0-SNAPSHOT</version>
    +          <relativePath>../../pom.xml</relativePath>
    +        </parent>
    +
    +        <groupId>org.apache.spark</groupId>
    +        <artifactId>spark-hbase_2.10</artifactId>
    +        <properties>
    +           <sbt.project.name>spark-hbase</sbt.project.name>
    +        </properties>
    +        <packaging>jar</packaging>
    +        <name>Spark Project External Flume</name>
    +        <url>http://spark.apache.org/</url>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.scalatest</groupId>
    +			<artifactId>scalatest_${scala.binary.version}</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    +			<artifactId>guava</artifactId>
    +			<version>14.0.1</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-client</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>com.google.guava</groupId>
    +					<artifactId>guava</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hadoop</groupId>
    +			<artifactId>hadoop-common</artifactId>
    +			<version>2.3.0</version>
    +			<type>test-jar</type>
    +			<classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-core_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>org.eclipse.jetty.orbit</groupId>
    +					<artifactId>javax.servlet</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.spark</groupId>
    +			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
    +			<version>1.0.0</version>
    +
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-client</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>io.netty</groupId>
    +					<artifactId>netty</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +
    +
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-common</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-common</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-server</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.hbase</groupId>
    +			<artifactId>hbase-server</artifactId>
    +			<version>0.98.1-hadoop2</version>
    +			<type>test-jar</type>
    +                        <classifier>tests</classifier>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>javax.servlet</groupId>
    +					<artifactId>servlet-api</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>javax.servlet.jsp</groupId>
    +					<artifactId>jsp-api</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>javax.servlet</groupId>
    +			<artifactId>javax.servlet-api</artifactId>
    +			<version>3.1.0</version>
    --- End diff --
    
    Servlet 3.1.0 is definitely not something you want to include. Jetty already brings in 3.0.0 and that is the right version for everything. javax.servlet should not be in the build, at least according to how it's set up now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---