You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Todd Nist <ts...@gmail.com> on 2015/03/19 00:48:40 UTC

[SQL] Elasticsearch-hadoop, exception creating temporary table

I am attempting to access ElasticSearch and expose it’s data through
SparkSQL using the elasticsearch-hadoop project.  I am encountering the
following exception when trying to create a Temporary table from a resource
in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at
EsSparkSQL.scala:51, took 0.862184 s
Create Temporary Table for querying
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)
at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)
at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)
at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)
at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

I have loaded the “accounts.json” file from ElasticSearch into my
ElasticSearch cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET
'http://localhost:9200/bank/_mapping'
{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File

import scala.collection.JavaConversions._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
    conf.set("es.nodes", ElasticSearch.Nodes)
    conf.set("es.port", ElasticSearch.HttpPort.toString())
    conf.set("es.index.auto.create", "true");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.executor.memory","1g")
    conf.set("spark.kryoserializer.buffer.mb","256")

    val sparkContext = new SparkContext(conf)

    sparkContext
  }

  def main(args: Array[String]) {

    val sc = sparkInit

    val sqlContext = new SQLContext(sc)
    import sqlContext._

    val start = System.currentTimeMillis()

    /*
     * Read from ES and query with with Spark & SparkSQL
     */
    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")

    esData.collect.foreach(println(_))

    val end = System.currentTimeMillis()
    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would
expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrite.scala:67, took 6.897443 s
(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff
Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F,
lastname -> Flores, email -> rodriquezflores@tourmania.com, firstname
-> Rodriquez, account_number -> 4))
(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune
Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M,
lastname -> Meadows, email -> opalmeadows@cedward.com, firstname ->
Opal, account_number -> 9))...

As does creating a new index and type like this:

    println("read json in and store in ES")
    // read in JSON and store in ES
    val path = "document.json"
    val rdd : SchemaRDD = sqlContext.jsonFile(path)

    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like
this I get the exception shown above:

    println("Create Temporary Table for querying")

    val schemaRDD: SchemaRDD = sqlContext.sql(
          "CREATE TEMPORARY TABLE account    " +
          "USING org.elasticsearch.spark.sql " +
          "OPTIONS (resource 'bank/account')  " )
  }
}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the
elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.

-Todd

Re: [SQL] Elasticsearch-hadoop, exception creating temporary table

Posted by Todd Nist <ts...@gmail.com>.
Thanks for the assistance, I found the error it wan something I had donep;
PEBCAK.  I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in
the project/lib directory causing it to be managed dependency and being
brought in first, even though the build.sbt had the correct version
specified, 2.1.0.BUILD-SNAPSHOT

No reason for it to bet there at all and something I don't usually do.

Thanks aging for point out the fact that it was a version mismatch issue.

-Todd

On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao <ha...@intel.com> wrote:

>  Todd, can you try run the code in Spark shell (bin/spark-shell), maybe
> you need to write some fake code to call the function in MappingUtils
> .scala, in the meantime, can you also check the jar dependencies tree of
> your project? Or the download dependency jar files, just in case multiple
> versions of spark has been introduced.
>
>
>
> *From:* Todd Nist [mailto:tsindotg@gmail.com]
> *Sent:* Thursday, March 19, 2015 9:04 AM
> *To:* Cheng, Hao
> *Cc:* user@spark.apache.org
> *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary
> table
>
>
>
> Thanks for the quick response.
>
> The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download.
> Here is the startup:
>
> radtech>$ ./sbin/start-master.sh
>
> starting org.apache.spark.deploy.master.Master, logging *to* /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out*
>
>
>
> Spark *assembly* *has* been built *with* Hive, including Datanucleus jars *on* classpath
>
> Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 --webui-port 8080
>
> ========================================
>
>
>
> 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, INT]
>
> 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist
>
> 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist
>
> 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); users *with* modify permissions: *Set*(tnist)
>
> 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started
>
> 15/03/18 20:31:41 INFO Remoting: Starting remoting
>
> 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses :[akka.tcp://sparkMaster@radtech.io:7077]
>
> 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: [akka.tcp://sparkMaster@radtech.io:7077]
>
> 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* port 7077.
>
> 15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:7077
>
> 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* port 8080.
>
> 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:8080
>
> 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE
>
>  My build.sbt for the spark job is as follows:
>
> import AssemblyKeys._
>
>
>
> // activating assembly plugin
>
> assemblySettings
>
>
>
> name := "elasticsearch-spark"
>
>
>
> *version* := "0.0.1"
>
>
>
> val SCALA_VERSION = "2.10.4"
>
>
>
> val SPARK_VERSION = "1.2.1"
>
>
>
> val defaultSettings = Defaults.coreDefaultSettings ++ Seq(
>
>   organization := "io.radtec",
>
>   scalaVersion := SCALA_VERSION,
>
>   resolvers := Seq(
>
>     //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo",
>
>     Resolver.typesafeRepo("releases")),
>
>   scalacOptions ++= Seq(
>
>     "-unchecked",
>
>     "-deprecation",
>
>     "-Xlint",
>
>     "-Ywarn-dead-code",
>
>     "-language:_",
>
>     "-target:jvm-1.7",
>
>     "-encoding",
>
>     "UTF-8"
>
>   ),
>
>   parallelExecution in Test := false,
>
>   testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
>
>   publishArtifact in (Test, packageBin) := true,
>
>   unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)),
>
>   unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)),
>
>   EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource,
>
>   credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),
>
>   publishTo := Some("Artifactory Realm" *at* "http://artifactory.ods:8082/artifactory/ivy-repo-local")
>
> )
>
>
>
> // custom Hadoop client, configured as provided, since it shouldn't go to assembly jar
>
> val hadoopDeps = Seq (
>
>   "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided"
>
> )
>
>
>
> // ElasticSearch Hadoop support
>
> val esHadoopDeps = Seq (
>
>   ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT").
>
>     exclude("org.apache.spark", "spark-core_2.10").
>
>     exclude("org.apache.spark", "spark-streaming_2.10").
>
>     exclude("org.apache.spark", "spark-sql_2.10").
>
>     exclude("javax.jms", "jms")
>
> )
>
>
>
> val commonDeps = Seq(
>
>   "com.eaio.uuid"             % "uuid"                  % "3.2",
>
>   "joda-time"                 % "joda-time"             % "2.3",
>
>   "org.joda"                  % "joda-convert"          % "1.6"
>
> )
>
>
>
> val jsonDeps = Seq(
>
>   "com.googlecode.json-simple"        % "json-simple"                     % "1.1.1",
>
>   "com.fasterxml.jackson.core"        % "jackson-core"                    % "2.5.1",
>
>   "com.fasterxml.jackson.core"        % "jackson-annotations"             % "2.5.1",
>
>   "com.fasterxml.jackson.core"        % "jackson-databind"                % "2.5.1",
>
>   "com.fasterxml.jackson.module"      % "jackson-module-jaxb-annotations" % "2.5.1",
>
>   "com.fasterxml.jackson.module"     %% "jackson-module-scala"            % "2.5.1",
>
>   "com.fasterxml.jackson.dataformat"  % "jackson-dataformat-xml"          % "2.5.1",
>
>   "com.fasterxml.jackson.datatype"    % "jackson-datatype-joda"           % "2.5.1"
>
> )
>
>
>
> val commonTestDeps = Seq(
>
>   "org.specs2"               %% "specs2"                   % "2.3.11"       % "test",
>
>   "org.mockito"               % "mockito-all"              % "1.9.5"        % "test",
>
>   "org.scalacheck"           %% "scalacheck"               % "1.11.3"       % "test",
>
>   "org.scalatest"            %% "scalatest"                % "1.9.1"        % "test"
>
> )
>
>
>
> // Project definitions
>
>
>
> lazy val root = (project in *file*("."))
>
>         .settings(defaultSettings:_*)
>
>         .settings(libraryDependencies ++= Seq(
>
>                 "com.databricks"           %% "spark-csv"             % "0.1",
>
>                 // Spark itself, configured as provided, since it shouldn't go to assembly jar
>
>                 "org.apache.spark"         %% "spark-core"            % SPARK_VERSION   % "provided",
>
>                 "org.apache.spark"         %% "spark-streaming"       % SPARK_VERSION   % "provided",
>
>                 "org.apache.spark"         %% "spark-sql"             % SPARK_VERSION   % "provided",
>
>                 "org.apache.spark"         %% "spark-hive"            % SPARK_VERSION   % "provided",
>
>                 ("org.apache.spark"        %% "spark-streaming-kafka" % SPARK_VERSION).
>
>                         exclude("org.apache.spark", "spark-core_2.10").
>
>                         exclude("org.apache.spark", "spark-streaming_2.10").
>
>                         exclude("org.apache.spark", "spark-sql_2.10").
>
>                         exclude("javax.jms", "jms"),
>
>                 "org.apache.spark"         %% "spark-streaming"       % SPARK_VERSION   % "test" classifier "tests",
>
>                 "com.typesafe"              % "config"                % "1.2.1",
>
>                 "com.typesafe.play"        %% "play-json"             % "2.3.4"
>
>             ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ commonDeps)
>
>
>
> resolvers ++= Seq(
>
>   Resolver.sonatypeRepo("snapshots"),
>
>   Resolver.sonatypeRepo("public"),
>
>   "conjars.org" *at* "http://conjars.org/repo",
>
>   "JBoss Repository" *at* "http://repository.jboss.org/nexus/content/repositories/releases/",
>
>   "Spray Repository" *at* "http://repo.spray.cc/",
>
>   "Cloudera Repository" *at* "https://repository.cloudera.com/artifactory/cloudera-repos/",
>
>   "Akka Repository" *at* "http://repo.akka.io/releases/",
>
>   "Twitter4J Repository" *at* "http://twitter4j.org/maven2/",
>
>   "Apache HBase" *at* "https://repository.apache.org/content/repositories/releases",
>
>   "Twitter Maven Repo" *at* "http://maven.twttr.com/",
>
>   "scala-tools" *at* "https://oss.sonatype.org/content/groups/scala-tools",
>
>   "Typesafe repository" *at* "http://repo.typesafe.com/typesafe/releases/",
>
>   "Second Typesafe repo" *at* "http://repo.typesafe.com/typesafe/maven-releases/"
>
> )
>
>
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>
>   {
>
>     *case* m *if* m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
>
>     *case* m *if* m.startsWith("META-INF") => MergeStrategy.discard
>
>     *case* PathList("javax", "servlet", xs @ _*) => MergeStrategy.*first*
>
>     *case* PathList("org", "apache", xs @ _*) => MergeStrategy.*first*
>
>     *case* PathList("org", "jboss", xs @ _*) => MergeStrategy.*first*
>
>     *case* "about.html"  => MergeStrategy.*rename*
>
>     *case* "reference.conf" => MergeStrategy.concat
>
>     *case* _ => MergeStrategy.*first*
>
>   }
>
> }
>
>  Am I by chance missing an exclude that is bring in an older version of
> spark into the Assembly; hmm need to go look at that.
>
> I am using the SNAPSHOT build of elasticsearch-hadoop as it is built
> against 1.2.1 of spark.  Per the elasticsearch-hadoop gradle.properties
> the spark version set to:
>
> sparkVersion = 1.2.1
>
> Other than possibly missing an exclude that is bring in an older version
> of Spark from some where, I do see that I am referencing the
> "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", but I don't
> think that is the issue.
>
> Any other thoughts?
>
> -Todd
>
>
>
> On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <ha...@intel.com> wrote:
>
>  Seems the elasticsearch-hadoop project was built with an old version of
> Spark, and then you upgraded the Spark version in execution env, as I know
> the StructField changed the definition in Spark 1.2, can you confirm the
> version problem first?
>
>
>
> *From:* Todd Nist [mailto:tsindotg@gmail.com]
> *Sent:* Thursday, March 19, 2015 7:49 AM
> *To:* user@spark.apache.org
> *Subject:* [SQL] Elasticsearch-hadoop, exception creating temporary table
>
>
>
>
>
> I am attempting to access ElasticSearch and expose it’s data through
> SparkSQL using the elasticsearch-hadoop project.  I am encountering the
> following exception when trying to create a Temporary table from a resource
> in ElasticSearch.:
>
> 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s
>
> Create Temporary Table for querying
>
> Exception *in* thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
>
> at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
>
> at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
>
> at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
>
> at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
>
> at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)
>
> at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)
>
> at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)
>
> at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)
>
> at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)
>
> at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)
>
> at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)
>
> at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)
>
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
>
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
>
> at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
>
> at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)
>
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
>
> at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)
>
> at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
>
>   I have loaded the “accounts.json” file from ElasticSearch into my
> ElasticSearch cluster. The mapping looks as follows:
>
> radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping'
>
> {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}
>
>  I can read the data just fine doing the following:
>
> import java.io.File
>
>
>
> import scala.collection.JavaConversions._
>
>
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.{SchemaRDD,SQLContext}
>
>
>
> // ES imports
>
> import org.elasticsearch.spark._
>
> import org.elasticsearch.spark.sql._
>
>
>
> import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}
>
>
>
> object ElasticSearchReadWrite {
>
>
>
>   /**
>
>    * Spark specific configuration
>
>    */
>
>   def sparkInit(): SparkContext = {
>
>     val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
>
>     conf.set("es.nodes", ElasticSearch.Nodes)
>
>     conf.set("es.port", ElasticSearch.HttpPort.toString())
>
>     conf.set("es.index.auto.create", "true");
>
>     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
>
>     conf.set("spark.executor.memory","1g")
>
>     conf.set("spark.kryoserializer.buffer.mb","256")
>
>
>
>     val sparkContext = new SparkContext(conf)
>
>
>
>     sparkContext
>
>   }
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     val sc = sparkInit
>
>
>
>     val sqlContext = new SQLContext(sc)
>
>     import sqlContext._
>
>
>
>     val start = System.currentTimeMillis()
>
>
>
>     /*
>
>      * Read from ES and query with with Spark & SparkSQL
>
>      */
>
>     val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")
>
>
>
>     esData.collect.foreach(println(_))
>
>
>
>     val end = System.currentTimeMillis()
>
>     println(s"Total time: ${end-start} ms")
>
>  This works fine and and prints the content of esData out as one would
> expect.
>
> 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite*.*scala:67, took 6.897443 s
>
> (4,*Map*(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email -> rodriquezflores@tourmania*.*com <ro...@tourmania.com>, firstname -> Rodriquez, account_number -> 4))
>
> (9,*Map*(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email -> opalmeadows@cedward*.*com <op...@cedward.com>, firstname -> Opal, account_number -> 9))
>
> ...
>
>  As does creating a new index and type like this:
>
>     println("read json in and store in ES")
>
>     // read *in* JSON *and* store *in* ES
>
>     val path = "document.json"
>
>     val rdd : SchemaRDD = sqlContext.jsonFile(path)
>
>
>
>     rdd.saveToEs("myIndex/myDoc")
>
>  However, when I attempt to access the the table via the sqlContext like
> this I get the exception shown above:
>
>     println("*Create* *Temporary* *Table* *for* querying")
>
>
>
>     val schemaRDD: SchemaRDD = sqlContext.sql(
>
>           "*CREATE* *TEMPORARY* *TABLE* account    " +
>
>           "*USING* org.elasticsearch.spark.*sql* " +
>
>           "OPTIONS (resource 'bank/account')  " )
>
>   }
>
> }
>
>  I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the
> elasticsearch-hadoop:
>
> "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"
>
>  Any insight on what I am doing wrong?
>
> TIA for the assistance.
>
>   -Todd
>
>
>

RE: [SQL] Elasticsearch-hadoop, exception creating temporary table

Posted by "Cheng, Hao" <ha...@intel.com>.
Todd, can you try run the code in Spark shell (bin/spark-shell), maybe you need to write some fake code to call the function in MappingUtils.scala, in the meantime, can you also check the jar dependencies tree of your project? Or the download dependency jar files, just in case multiple versions of spark has been introduced.

From: Todd Nist [mailto:tsindotg@gmail.com]
Sent: Thursday, March 19, 2015 9:04 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: [SQL] Elasticsearch-hadoop, exception creating temporary table


Thanks for the quick response.

The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup:

radtech>$ ./sbin/start-master.sh

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io<http://radtech.io>.out



Spark assembly has been built with Hive, including Datanucleus jars on classpath

Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io<http://radtech.io> --port 7077 --webui-port 8080

========================================



15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM, HUP, INT]

15/03/18 20:31:40 INFO SecurityManager: Changing view acls to: tnist

15/03/18 20:31:40 INFO SecurityManager: Changing modify acls to: tnist

15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tnist); users with modify permissions: Set(tnist)

15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started

15/03/18 20:31:41 INFO Remoting: Starting remoting

15/03/18 20:31:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@radtech.io:7077<http://sparkMaster@radtech.io:7077>]

15/03/18 20:31:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkMaster@radtech.io:7077<http://sparkMaster@radtech.io:7077>]

15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' on port 7077.

15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:7077<http://radtech.io:7077>

15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' on port 8080.

15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:8080<http://192.168.1.5:8080>

15/03/18 20:31:41 INFO Master: I have been elected leader! New state: ALIVE

My build.sbt for the spark job is as follows:

import AssemblyKeys._



// activating assembly plugin

assemblySettings



name := "elasticsearch-spark"



version := "0.0.1"



val SCALA_VERSION = "2.10.4"



val SPARK_VERSION = "1.2.1"



val defaultSettings = Defaults.coreDefaultSettings ++ Seq(

  organization := "io.radtec",

  scalaVersion := SCALA_VERSION,

  resolvers := Seq(

    //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo",

    Resolver.typesafeRepo("releases")),

  scalacOptions ++= Seq(

    "-unchecked",

    "-deprecation",

    "-Xlint",

    "-Ywarn-dead-code",

    "-language:_",

    "-target:jvm-1.7",

    "-encoding",

    "UTF-8"

  ),

  parallelExecution in Test := false,

  testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),

  publishArtifact in (Test, packageBin) := true,

  unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)),

  unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)),

  EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource,

  credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),

  publishTo := Some("Artifactory Realm" at "http://artifactory.ods:8082/artifactory/ivy-repo-local")

)



// custom Hadoop client, configured as provided, since it shouldn't go to assembly jar

val hadoopDeps = Seq (

  "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided"

)



// ElasticSearch Hadoop support

val esHadoopDeps = Seq (

  ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT").

    exclude("org.apache.spark", "spark-core_2.10").

    exclude("org.apache.spark", "spark-streaming_2.10").

    exclude("org.apache.spark", "spark-sql_2.10").

    exclude("javax.jms", "jms")

)



val commonDeps = Seq(

  "com.eaio.uuid"             % "uuid"                  % "3.2",

  "joda-time"                 % "joda-time"             % "2.3",

  "org.joda"                  % "joda-convert"          % "1.6"

)



val jsonDeps = Seq(

  "com.googlecode.json-simple"        % "json-simple"                     % "1.1.1",

  "com.fasterxml.jackson.core"        % "jackson-core"                    % "2.5.1",

  "com.fasterxml.jackson.core"        % "jackson-annotations"             % "2.5.1",

  "com.fasterxml.jackson.core"        % "jackson-databind"                % "2.5.1",

  "com.fasterxml.jackson.module"      % "jackson-module-jaxb-annotations" % "2.5.1",

  "com.fasterxml.jackson.module"     %% "jackson-module-scala"            % "2.5.1",

  "com.fasterxml.jackson.dataformat"  % "jackson-dataformat-xml"          % "2.5.1",

  "com.fasterxml.jackson.datatype"    % "jackson-datatype-joda"           % "2.5.1"

)



val commonTestDeps = Seq(

  "org.specs2"               %% "specs2"                   % "2.3.11"       % "test",

  "org.mockito"               % "mockito-all"              % "1.9.5"        % "test",

  "org.scalacheck"           %% "scalacheck"               % "1.11.3"       % "test",

  "org.scalatest"            %% "scalatest"                % "1.9.1"        % "test"

)



// Project definitions



lazy val root = (project in file("."))

        .settings(defaultSettings:_*)

        .settings(libraryDependencies ++= Seq(

                "com.databricks"           %% "spark-csv"             % "0.1",

                // Spark itself, configured as provided, since it shouldn't go to assembly jar

                "org.apache.spark"         %% "spark-core"            % SPARK_VERSION   % "provided",

                "org.apache.spark"         %% "spark-streaming"       % SPARK_VERSION   % "provided",

                "org.apache.spark"         %% "spark-sql"             % SPARK_VERSION   % "provided",

                "org.apache.spark"         %% "spark-hive"            % SPARK_VERSION   % "provided",

                ("org.apache.spark"        %% "spark-streaming-kafka" % SPARK_VERSION).

                        exclude("org.apache.spark", "spark-core_2.10").

                        exclude("org.apache.spark", "spark-streaming_2.10").

                        exclude("org.apache.spark", "spark-sql_2.10").

                        exclude("javax.jms", "jms"),

                "org.apache.spark"         %% "spark-streaming"       % SPARK_VERSION   % "test" classifier "tests",

                "com.typesafe"              % "config"                % "1.2.1",

                "com.typesafe.play"        %% "play-json"             % "2.3.4"

            ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ commonDeps)



resolvers ++= Seq(

  Resolver.sonatypeRepo("snapshots"),

  Resolver.sonatypeRepo("public"),

  "conjars.org<http://conjars.org>" at "http://conjars.org/repo",

  "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",

  "Spray Repository" at "http://repo.spray.cc/",

  "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",

  "Akka Repository" at "http://repo.akka.io/releases/",

  "Twitter4J Repository" at "http://twitter4j.org/maven2/",

  "Apache HBase" at "https://repository.apache.org/content/repositories/releases",

  "Twitter Maven Repo" at "http://maven.twttr.com/",

  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",

  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",

  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/"

)



mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>

  {

    case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard

    case m if m.startsWith("META-INF") => MergeStrategy.discard

    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first

    case PathList("org", "apache", xs @ _*) => MergeStrategy.first

    case PathList("org", "jboss", xs @ _*) => MergeStrategy.first

    case "about.html"  => MergeStrategy.rename

    case "reference.conf" => MergeStrategy.concat

    case _ => MergeStrategy.first

  }

}

Am I by chance missing an exclude that is bring in an older version of spark into the Assembly; hmm need to go look at that.

I am using the SNAPSHOT build of elasticsearch-hadoop as it is built against 1.2.1 of spark.  Per the elasticsearch-hadoop gradle.properties the spark version set to:

sparkVersion = 1.2.1

Other than possibly missing an exclude that is bring in an older version of Spark from some where, I do see that I am referencing the "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", but I don't think that is the issue.

Any other thoughts?

-Todd

On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <ha...@intel.com>> wrote:
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first?

From: Todd Nist [mailto:tsindotg@gmail.com<ma...@gmail.com>]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project.  I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s

Create Temporary Table for querying

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)

at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping'

{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

    conf.set("es.nodes", ElasticSearch.Nodes)

    conf.set("es.port", ElasticSearch.HttpPort.toString())

    conf.set("es.index.auto.create", "true");

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    conf.set("spark.executor.memory","1g")

    conf.set("spark.kryoserializer.buffer.mb","256")



    val sparkContext = new SparkContext(conf)



    sparkContext

  }



  def main(args: Array[String]) {



    val sc = sparkInit



    val sqlContext = new SQLContext(sc)

    import sqlContext._



    val start = System.currentTimeMillis()



    /*

     * Read from ES and query with with Spark & SparkSQL

     */

    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")



    esData.collect.foreach(println(_))



    val end = System.currentTimeMillis()

    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 6.897443 s

(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email -> rodriquezflores@tourmania.com<ma...@tourmania.com>, firstname -> Rodriquez, account_number -> 4))

(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email -> opalmeadows@cedward.com<ma...@cedward.com>, firstname -> Opal, account_number -> 9))

...

As does creating a new index and type like this:

    println("read json in and store in ES")

    // read in JSON and store in ES

    val path = "document.json"

    val rdd : SchemaRDD = sqlContext.jsonFile(path)



    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like this I get the exception shown above:

    println("Create Temporary Table for querying")



    val schemaRDD: SchemaRDD = sqlContext.sql(

          "CREATE TEMPORARY TABLE account    " +

          "USING org.elasticsearch.spark.sql " +

          "OPTIONS (resource 'bank/account')  " )

  }

}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.

-Todd


Re: [SQL] Elasticsearch-hadoop, exception creating temporary table

Posted by Todd Nist <ts...@gmail.com>.
Thanks for the quick response.

The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here
is the startup:

radtech>$ ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to
/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.out

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Spark Command: java -cp
::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
-Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
org.apache.spark.deploy.master.Master --ip radtech.io --port 7077
--webui-port 8080
========================================
15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM,
HUP, INT]15/03/18 20:31:40 INFO SecurityManager: Changing view acls
to: tnist15/03/18 20:31:40 INFO SecurityManager: Changing modify acls
to: tnist15/03/18 20:31:40 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(tnist); users with modify permissions:
Set(tnist)15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger
started15/03/18 20:31:41 INFO Remoting: Starting remoting15/03/18
20:31:41 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkMaster@radtech.io:7077]15/03/18 20:31:41 INFO
Remoting: Remoting now listens on addresses:
[akka.tcp://sparkMaster@radtech.io:7077]15/03/18 20:31:41 INFO Utils:
Successfully started service 'sparkMaster' on port 7077.15/03/18
20:31:41 INFO Master: Starting Spark master at
spark://radtech.io:707715/03/18 20:31:41 INFO Utils: Successfully
started service 'MasterUI' on port 8080.15/03/18 20:31:41 INFO
MasterWebUI: Started MasterWebUI at http://192.168.1.5:808015/03/18
20:31:41 INFO Master: I have been elected leader! New state: ALIVE

My build.sbt for the spark job is as follows:

import AssemblyKeys._
// activating assembly plugin
assemblySettings

name := "elasticsearch-spark"
version := "0.0.1"

val SCALA_VERSION = "2.10.4"val SPARK_VERSION = "1.2.1"

val defaultSettings = Defaults.coreDefaultSettings ++ Seq(
  organization := "io.radtec",
  scalaVersion := SCALA_VERSION,
  resolvers := Seq(
    //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo",
    Resolver.typesafeRepo("releases")),
  scalacOptions ++= Seq(
    "-unchecked",
    "-deprecation",
    "-Xlint",
    "-Ywarn-dead-code",
    "-language:_",
    "-target:jvm-1.7",
    "-encoding",
    "UTF-8"
  ),
  parallelExecution in Test := false,
  testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
  publishArtifact in (Test, packageBin) := true,
  unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)),
  unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)),
  EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource,
  credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),
  publishTo := Some("Artifactory Realm" at
"http://artifactory.ods:8082/artifactory/ivy-repo-local")
)
// custom Hadoop client, configured as provided, since it shouldn't go
to assembly jar
val hadoopDeps = Seq (
  "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided"
)
// ElasticSearch Hadoop support
val esHadoopDeps = Seq (
  ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT").
    exclude("org.apache.spark", "spark-core_2.10").
    exclude("org.apache.spark", "spark-streaming_2.10").
    exclude("org.apache.spark", "spark-sql_2.10").
    exclude("javax.jms", "jms")
)

val commonDeps = Seq(
  "com.eaio.uuid"             % "uuid"                  % "3.2",
  "joda-time"                 % "joda-time"             % "2.3",
  "org.joda"                  % "joda-convert"          % "1.6"
)

val jsonDeps = Seq(
  "com.googlecode.json-simple"        % "json-simple"
   % "1.1.1",
  "com.fasterxml.jackson.core"        % "jackson-core"
   % "2.5.1",
  "com.fasterxml.jackson.core"        % "jackson-annotations"
   % "2.5.1",
  "com.fasterxml.jackson.core"        % "jackson-databind"
   % "2.5.1",
  "com.fasterxml.jackson.module"      %
"jackson-module-jaxb-annotations" % "2.5.1",
  "com.fasterxml.jackson.module"     %% "jackson-module-scala"
   % "2.5.1",
  "com.fasterxml.jackson.dataformat"  % "jackson-dataformat-xml"
   % "2.5.1",
  "com.fasterxml.jackson.datatype"    % "jackson-datatype-joda"
   % "2.5.1"
)

val commonTestDeps = Seq(
  "org.specs2"               %% "specs2"                   % "2.3.11"
     % "test",
  "org.mockito"               % "mockito-all"              % "1.9.5"
     % "test",
  "org.scalacheck"           %% "scalacheck"               % "1.11.3"
     % "test",
  "org.scalatest"            %% "scalatest"                % "1.9.1"
     % "test")
// Project definitions

lazy val root = (project in file("."))
        .settings(defaultSettings:_*)
        .settings(libraryDependencies ++= Seq(
                "com.databricks"           %% "spark-csv"             % "0.1",
                // Spark itself, configured as provided, since it
shouldn't go to assembly jar
                "org.apache.spark"         %% "spark-core"
% SPARK_VERSION   % "provided",
                "org.apache.spark"         %% "spark-streaming"
% SPARK_VERSION   % "provided",
                "org.apache.spark"         %% "spark-sql"
% SPARK_VERSION   % "provided",
                "org.apache.spark"         %% "spark-hive"
% SPARK_VERSION   % "provided",
                ("org.apache.spark"        %% "spark-streaming-kafka"
% SPARK_VERSION).
                        exclude("org.apache.spark", "spark-core_2.10").
                        exclude("org.apache.spark", "spark-streaming_2.10").
                        exclude("org.apache.spark", "spark-sql_2.10").
                        exclude("javax.jms", "jms"),
                "org.apache.spark"         %% "spark-streaming"
% SPARK_VERSION   % "test" classifier "tests",
                "com.typesafe"              % "config"                % "1.2.1",
                "com.typesafe.play"        %% "play-json"             % "2.3.4"
            ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++
commonTestDeps ++ commonDeps)

resolvers ++= Seq(
  Resolver.sonatypeRepo("snapshots"),
  Resolver.sonatypeRepo("public"),
  "conjars.org" at "http://conjars.org/repo",
  "JBoss Repository" at
"http://repository.jboss.org/nexus/content/repositories/releases/",
  "Spray Repository" at "http://repo.spray.cc/",
  "Cloudera Repository" at
"https://repository.cloudera.com/artifactory/cloudera-repos/",
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Twitter4J Repository" at "http://twitter4j.org/maven2/",
  "Apache HBase" at
"https://repository.apache.org/content/repositories/releases",
  "Twitter Maven Repo" at "http://maven.twttr.com/",
  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/"
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
    case m if m.startsWith("META-INF") => MergeStrategy.discard
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
    case PathList("org", "apache", xs @ _*) => MergeStrategy.first
    case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
    case "about.html"  => MergeStrategy.rename
    case "reference.conf" => MergeStrategy.concat
    case _ => MergeStrategy.first
  }
}

Am I by chance missing an exclude that is bring in an older version of
spark into the Assembly; hmm need to go look at that.

I am using the SNAPSHOT build of elasticsearch-hadoop as it is built
against 1.2.1 of spark.  Per the elasticsearch-hadoop gradle.properties the
spark version set to:

sparkVersion = 1.2.1

Other than possibly missing an exclude that is bring in an older version of
Spark from some where, I do see that I am referencing the
"org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", but I don't
think that is the issue.

Any other thoughts?

-Todd

On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <ha...@intel.com> wrote:

>  Seems the elasticsearch-hadoop project was built with an old version of
> Spark, and then you upgraded the Spark version in execution env, as I know
> the StructField changed the definition in Spark 1.2, can you confirm the
> version problem first?
>
>
>
> *From:* Todd Nist [mailto:tsindotg@gmail.com]
> *Sent:* Thursday, March 19, 2015 7:49 AM
> *To:* user@spark.apache.org
> *Subject:* [SQL] Elasticsearch-hadoop, exception creating temporary table
>
>
>
>
>
> I am attempting to access ElasticSearch and expose it’s data through
> SparkSQL using the elasticsearch-hadoop project.  I am encountering the
> following exception when trying to create a Temporary table from a resource
> in ElasticSearch.:
>
> 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s
>
> Create Temporary Table for querying
>
> Exception *in* thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
>
> at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
>
> at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
>
> at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
>
> at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
>
> at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)
>
> at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)
>
> at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)
>
> at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)
>
> at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)
>
> at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)
>
> at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)
>
> at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)
>
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
>
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
>
> at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
>
> at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)
>
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
>
> at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)
>
> at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
>
>   I have loaded the “accounts.json” file from ElasticSearch into my
> ElasticSearch cluster. The mapping looks as follows:
>
> radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping'
>
> {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}
>
>  I can read the data just fine doing the following:
>
> import java.io.File
>
>
>
> import scala.collection.JavaConversions._
>
>
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.{SchemaRDD,SQLContext}
>
>
>
> // ES imports
>
> import org.elasticsearch.spark._
>
> import org.elasticsearch.spark.sql._
>
>
>
> import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}
>
>
>
> object ElasticSearchReadWrite {
>
>
>
>   /**
>
>    * Spark specific configuration
>
>    */
>
>   def sparkInit(): SparkContext = {
>
>     val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
>
>     conf.set("es.nodes", ElasticSearch.Nodes)
>
>     conf.set("es.port", ElasticSearch.HttpPort.toString())
>
>     conf.set("es.index.auto.create", "true");
>
>     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
>
>     conf.set("spark.executor.memory","1g")
>
>     conf.set("spark.kryoserializer.buffer.mb","256")
>
>
>
>     val sparkContext = new SparkContext(conf)
>
>
>
>     sparkContext
>
>   }
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     val sc = sparkInit
>
>
>
>     val sqlContext = new SQLContext(sc)
>
>     import sqlContext._
>
>
>
>     val start = System.currentTimeMillis()
>
>
>
>     /*
>
>      * Read from ES and query with with Spark & SparkSQL
>
>      */
>
>     val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")
>
>
>
>     esData.collect.foreach(println(_))
>
>
>
>     val end = System.currentTimeMillis()
>
>     println(s"Total time: ${end-start} ms")
>
>  This works fine and and prints the content of esData out as one would
> expect.
>
> 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite*.*scala:67, took 6.897443 s
>
> (4,*Map*(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email -> rodriquezflores@tourmania*.*com <ro...@tourmania.com>, firstname -> Rodriquez, account_number -> 4))
>
> (9,*Map*(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email -> opalmeadows@cedward*.*com <op...@cedward.com>, firstname -> Opal, account_number -> 9))
>
> ...
>
>  As does creating a new index and type like this:
>
>     println("read json in and store in ES")
>
>     // read *in* JSON *and* store *in* ES
>
>     val path = "document.json"
>
>     val rdd : SchemaRDD = sqlContext.jsonFile(path)
>
>
>
>     rdd.saveToEs("myIndex/myDoc")
>
>  However, when I attempt to access the the table via the sqlContext like
> this I get the exception shown above:
>
>     println("*Create* *Temporary* *Table* *for* querying")
>
>
>
>     val schemaRDD: SchemaRDD = sqlContext.sql(
>
>           "*CREATE* *TEMPORARY* *TABLE* account    " +
>
>           "*USING* org.elasticsearch.spark.*sql* " +
>
>           "OPTIONS (resource 'bank/account')  " )
>
>   }
>
> }
>
>  I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the
> elasticsearch-hadoop:
>
> "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"
>
>  Any insight on what I am doing wrong?
>
> TIA for the assistance.
>
>   -Todd
>

RE: [SQL] Elasticsearch-hadoop, exception creating temporary table

Posted by "Cheng, Hao" <ha...@intel.com>.
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first?

From: Todd Nist [mailto:tsindotg@gmail.com]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project.  I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s

Create Temporary Table for querying

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)

at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping'

{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

    conf.set("es.nodes", ElasticSearch.Nodes)

    conf.set("es.port", ElasticSearch.HttpPort.toString())

    conf.set("es.index.auto.create", "true");

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    conf.set("spark.executor.memory","1g")

    conf.set("spark.kryoserializer.buffer.mb","256")



    val sparkContext = new SparkContext(conf)



    sparkContext

  }



  def main(args: Array[String]) {



    val sc = sparkInit



    val sqlContext = new SQLContext(sc)

    import sqlContext._



    val start = System.currentTimeMillis()



    /*

     * Read from ES and query with with Spark & SparkSQL

     */

    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")



    esData.collect.foreach(println(_))



    val end = System.currentTimeMillis()

    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 6.897443 s

(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email -> rodriquezflores@tourmania.com<ma...@tourmania.com>, firstname -> Rodriquez, account_number -> 4))

(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email -> opalmeadows@cedward.com<ma...@cedward.com>, firstname -> Opal, account_number -> 9))

...

As does creating a new index and type like this:

    println("read json in and store in ES")

    // read in JSON and store in ES

    val path = "document.json"

    val rdd : SchemaRDD = sqlContext.jsonFile(path)



    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like this I get the exception shown above:

    println("Create Temporary Table for querying")



    val schemaRDD: SchemaRDD = sqlContext.sql(

          "CREATE TEMPORARY TABLE account    " +

          "USING org.elasticsearch.spark.sql " +

          "OPTIONS (resource 'bank/account')  " )

  }

}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.
[https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]

-Todd