You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@predictionio.apache.org by 王斌斌 <he...@163.com> on 2018/05/14 07:41:39 UTC

use hdfs

Hi, 
   I want the tmp models datas been sotred in HDFS, not the local /tmp. And I modified the code like this:
classALSModel(
override val rank: Int,
override val userFeatures: RDD[(Int, Array[Double])],
override val productFeatures: RDD[(Int, Array[Double])],
val userStringIntMap: BiMap[String, Int],
val itemStringIntMap: BiMap[String, Int])
extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
with PersistentModel[ALSAlgorithmParams] {

def save(id: String, params: ALSAlgorithmParams,
    sc: SparkContext): Boolean = {

    sc.parallelize(Seq(rank)).saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/rank")
    userFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures")
    productFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures")
    sc.parallelize(Seq(userStringIntMap))
      .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap")
    sc.parallelize(Seq(itemStringIntMap))
      .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap")
true
}

override def toString = {
s"userFeatures: [${userFeatures.count()}]" +
s"(${userFeatures.take(2).toList}...)" +
s" productFeatures: [${productFeatures.count()}]" +
s"(${productFeatures.take(2).toList}...)" +
s" userStringIntMap: [${userStringIntMap.size}]" +
s"(${userStringIntMap.take(2)}...)" +
s" itemStringIntMap: [${itemStringIntMap.size}]" +
s"(${itemStringIntMap.take(2)}...)"
}
}

object ALSModel
extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
def apply(id: String, params: ALSAlgorithmParams,
    sc: Option[SparkContext]) = {
new ALSModel(
      rank = sc.get.objectFile[Int](s"hdfs://predictionspark:9000/tmp/${id}/rank").first,
      userFeatures = sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures"),
      productFeatures = sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures"),
      userStringIntMap = sc.get
        .objectFile[BiMap[String, Int]](s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap").first,
      itemStringIntMap = sc.get
        .objectFile[BiMap[String, Int]](s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap").first)
  }
}


It works.
But why the pio-env.sh says:
# HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO
#                  with Hadoop 2.
# HADOOP_CONF_DIR=/opt/hadoop


I don't do this, it also works. So someone can explain this? And what is HADOOP_CONF_DIR? All the configurations in the hadoop server's etc/ ?

Re:Re: use hdfs

Posted by hellomsg <he...@163.com>.
Hi, Donald

 I had add the HADOOP_CONF_DIR configurations, it's works as you described. Thanks so much.



~$cat conf/core-site.xml 

<?xml version="1.0" encoding="UTF-8"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>




<configuration>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://predictionspark:9000</value>

    </property>

</configuration>





At 2018-05-18 01:22:07, "Donald Szeto" <do...@apache.org> wrote:

Hi,


Since Spark uses the HDFS API to access files, if you configure HADOOP_CONF_DIR to point to the config files of your Hadoop cluster, Spark will attempt to access files on your HDFS if you leave out the scheme and host name in the URI, i.e. hdfs://predictionspark:9000/ in your case. If HADOOP_CONF_DIR is not configured, Spark will instead attempt to access files locally.


Regardless of whether HADOOP_CONF_DIR is configured, if file URIs are hardcoded with scheme and host name, Spark will always respect it. This is not recommended as any changes to Hadoop config the engine must be updated and recompiled.


Here's a link to configuring PIO's storage backend: http://predictionio.apache.org/system/anotherdatastore/



Regards,
Donald


On Mon, May 14, 2018 at 12:41 AM, 王斌斌 <he...@163.com> wrote:

Hi, 
   I want the tmp models datas been sotred in HDFS, not the local /tmp. And I modified the code like this:
classALSModel(
override val rank: Int,
override val userFeatures: RDD[(Int, Array[Double])],
override val productFeatures: RDD[(Int, Array[Double])],
val userStringIntMap: BiMap[String, Int],
val itemStringIntMap: BiMap[String, Int])
extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
with PersistentModel[ALSAlgorithmParams] {

def save(id: String, params: ALSAlgorithmParams,
    sc: SparkContext): Boolean = {

    sc.parallelize(Seq(rank)).saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/rank")
    userFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures")
    productFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures")
    sc.parallelize(Seq(userStringIntMap))
      .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap")
    sc.parallelize(Seq(itemStringIntMap))
      .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap")
true
}

override def toString = {
s"userFeatures: [${userFeatures.count()}]" +
s"(${userFeatures.take(2).toList}...)" +
s" productFeatures: [${productFeatures.count()}]" +
s"(${productFeatures.take(2).toList}...)" +
s" userStringIntMap: [${userStringIntMap.size}]" +
s"(${userStringIntMap.take(2)}...)" +
s" itemStringIntMap: [${itemStringIntMap.size}]" +
s"(${itemStringIntMap.take(2)}...)"
}
}

object ALSModel
extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
def apply(id: String, params: ALSAlgorithmParams,
    sc: Option[SparkContext]) = {
new ALSModel(
      rank = sc.get.objectFile[Int](s"hdfs://predictionspark:9000/tmp/${id}/rank").first,
      userFeatures = sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures"),
      productFeatures = sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures"),
      userStringIntMap = sc.get
        .objectFile[BiMap[String, Int]](s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap").first,
      itemStringIntMap = sc.get
        .objectFile[BiMap[String, Int]](s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap").first)
  }
}


It works.
But why the pio-env.sh says:
# HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO
#                  with Hadoop 2.
# HADOOP_CONF_DIR=/opt/hadoop


I don't do this, it also works. So someone can explain this? And what is HADOOP_CONF_DIR? All the configurations in the hadoop server's etc/ ?




 



Re: use hdfs

Posted by Donald Szeto <do...@apache.org>.
Hi,

Since Spark uses the HDFS API to access files, if you configure
HADOOP_CONF_DIR to point to the config files of your Hadoop cluster, Spark
will attempt to access files on your HDFS if you leave out the scheme and
host name in the URI, i.e. hdfs://predictionspark:9000/ in your case. If
HADOOP_CONF_DIR is not configured, Spark will instead attempt to access
files locally.

Regardless of whether HADOOP_CONF_DIR is configured, if file URIs are
hardcoded with scheme and host name, Spark will always respect it. This is
not recommended as any changes to Hadoop config the engine must be updated
and recompiled.

Here's a link to configuring PIO's storage backend:
http://predictionio.apache.org/system/anotherdatastore/

Regards,
Donald

On Mon, May 14, 2018 at 12:41 AM, 王斌斌 <he...@163.com> wrote:

> Hi,
>    I want the tmp models datas been sotred in HDFS, not the local /tmp.
> And I modified the code like this:
>
> class ALSModel(
>     override val rank: Int,
>     override val userFeatures: RDD[(Int, Array[Double])],
>     override val productFeatures: RDD[(Int, Array[Double])],
>     val userStringIntMap: BiMap[String, Int],
>     val itemStringIntMap: BiMap[String, Int])
>   extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
>   with PersistentModel[ALSAlgorithmParams] {
>
>   def save(id: String, params: ALSAlgorithmParams,
>     sc: SparkContext): Boolean = {
>
>     sc.parallelize(Seq(rank)).saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/rank")
>     userFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures")
>     productFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures")
>     sc.parallelize(Seq(userStringIntMap))
>       .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap")
>     sc.parallelize(Seq(itemStringIntMap))
>       .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap")
>     true
>   }
>
>   override def toString = {
>     s"userFeatures: [${userFeatures.count()}]" +
>     s"(${userFeatures.take(2).toList}...)" +
>     s" productFeatures: [${productFeatures.count()}]" +
>     s"(${productFeatures.take(2).toList}...)" +
>     s" userStringIntMap: [${userStringIntMap.size}]" +
>     s"(${userStringIntMap.take(2)}...)" +
>     s" itemStringIntMap: [${itemStringIntMap.size}]" +
>     s"(${itemStringIntMap.take(2)}...)"
>   }
> }
>
> object ALSModel
>   extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
>   def apply(id: String, params: ALSAlgorithmParams,
>     sc: Option[SparkContext]) = {
>     new ALSModel(
>       rank = sc.get.objectFile[Int](s"hdfs://predictionspark:9000/tmp/${id}/rank").first,
>       userFeatures = sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures"),
>       productFeatures = sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures"),
>       userStringIntMap = sc.get
>         .objectFile[BiMap[String, Int]](s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap").first,
>       itemStringIntMap = sc.get
>         .objectFile[BiMap[String, Int]](s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap").first)
>   }
> }
>
>
> It works.
>
> But why the pio-env.sh says:
>
> # HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO
> #                  with Hadoop 2.
> # HADOOP_CONF_DIR=/opt/hadoop
>
>
> I don't do this, it also works. So someone can explain this? And what is HADOOP_CONF_DIR? All the configurations in the hadoop server's etc/ ?
>
>
>
>
>