You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/25 03:35:32 UTC

[2/5] carbondata git commit: added example for hive carbon integration

added example for hive carbon integration

corrected the root path for hiveexample

changed the carbon store to local

added both hadoop version for carbon build in hive example

removed unwanted dependency from pom.xml

used upper letters in dml and ddl key

added code for stoping the application programaticaly


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/68cafe58
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/68cafe58
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/68cafe58

Branch: refs/heads/master
Commit: 68cafe58e3018731d85b6dbde0037f53953c9309
Parents: c4e396c
Author: anubhav100 <an...@knoldus.in>
Authored: Thu Apr 6 17:47:35 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu May 25 11:34:37 2017 +0800

----------------------------------------------------------------------
 integration/hive/pom.xml                        |  69 ++++++-
 .../hive/server/HiveEmbeddedServer2.java        | 159 ++++++++++++++++
 integration/hive/src/main/resources/data.csv    |   3 +
 .../hive/src/main/resources/log4j.properties    |  11 ++
 .../carbondata/hiveexample/HiveExample.scala    | 182 +++++++++++++++++++
 5 files changed, 422 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/68cafe58/integration/hive/pom.xml
----------------------------------------------------------------------
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
index 714245e..12ef24a 100644
--- a/integration/hive/pom.xml
+++ b/integration/hive/pom.xml
@@ -64,6 +64,42 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.7</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>jline</groupId>
+                    <artifactId>jline</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-spark2</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.3.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>4.3-alpha1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>2.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libfb303</artifactId>
+            <version>0.9.3</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.carbondata</groupId>
             <artifactId>carbondata-hadoop</artifactId>
             <version>${project.version}</version>
@@ -77,7 +113,7 @@
     <build>
         <resources>
             <resource>
-                <directory>src/resources</directory>
+                <directory>src/main/resources</directory>
             </resource>
         </resources>
         <plugins>
@@ -89,13 +125,42 @@
                 </configuration>
             </plugin>
             <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>2.15.2</version>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                    <execution>
+                        <id>testCompile</id>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                        <phase>test</phase>
+                    </execution>
+                    <execution>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>2.18</version>
                 <!-- Note config is repeated in scalatest config -->
                 <configuration>
                     <includes>
-                        <include>**/Test*.java</include>
+                        <include>**/*.java</include>
+                        <include>**/*.scala</include>
+                       <include>**/Test*.java</include>
                         <include>**/*Test.java</include>
                         <include>**/*TestCase.java</include>
                         <include>**/*Suite.java</include>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68cafe58/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
new file mode 100644
index 0000000..bd94ae9
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hive.server;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.Service;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.server.HiveServer2;
+
+/**
+ * Utility starting a local/embedded Hive org.apache.carbondata.hive.server for testing purposes.
+ * Uses sensible defaults to properly clean between reruns.
+ * Additionally it wrangles the Hive internals so it rather executes the jobs locally not within a child JVM (which Hive calls local) or external.
+ */
+public class HiveEmbeddedServer2 {
+  private static final String SCRATCH_DIR = "/tmp/hive";
+  private static Log log = LogFactory.getLog(Hive.class);
+  private HiveServer2 hiveServer;
+  private HiveConf config;
+  private int port;
+
+  public void start() throws Exception {
+    log.info("Starting Hive Local/Embedded Server...");
+    if (hiveServer == null) {
+      config = configure();
+      hiveServer = new HiveServer2();
+      port = MetaStoreUtils.findFreePort();
+      config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port);
+      hiveServer.init(config);
+      hiveServer.start();
+      waitForStartup();
+    }
+  }
+
+  public int getFreePort() {
+    log.info("Free Port Available is " + port);
+    return port;
+  }
+
+  private void waitForStartup() throws Exception {
+    long timeout = TimeUnit.MINUTES.toMillis(1);
+    long unitOfWait = TimeUnit.SECONDS.toMillis(1);
+
+    CLIService hs2Client = getServiceClientInternal();
+    SessionHandle sessionHandle = null;
+    for (int interval = 0; interval < timeout / unitOfWait; interval++) {
+      Thread.sleep(unitOfWait);
+      try {
+        Map<String, String> sessionConf = new HashMap<String, String>();
+        sessionHandle = hs2Client.openSession("foo", "bar", sessionConf);
+        return;
+      } catch (Exception e) {
+        // service not started yet
+        continue;
+      } finally {
+        hs2Client.closeSession(sessionHandle);
+      }
+    }
+    throw new TimeoutException("Couldn't get a hold of HiveServer2...");
+  }
+
+  private CLIService getServiceClientInternal() {
+    for (Service service : hiveServer.getServices()) {
+      if (service instanceof CLIService) {
+        return (CLIService) service;
+      }
+    }
+    throw new IllegalStateException("Cannot find CLIService");
+  }
+
+  private HiveConf configure() throws Exception {
+    log.info("Setting The Hive Conf Variables");
+    String scratchDir = SCRATCH_DIR;
+
+    File scratchDirFile = new File(scratchDir);
+    //TestUtils.delete(scratchDirFile);
+
+    Configuration cfg = new Configuration();
+    HiveConf conf = new HiveConf(cfg, HiveConf.class);
+    conf.addToRestrictList("columns.comments");
+    conf.set("hive.scratch.dir.permission", "777");
+    conf.setVar(ConfVars.SCRATCHDIRPERMISSION, "777");
+    scratchDirFile.mkdirs();
+    // also set the permissions manually since Hive doesn't do it...
+    scratchDirFile.setWritable(true, false);
+
+    int random = new Random().nextInt();
+
+    conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random);
+    conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random);
+    conf.set("hive.exec.scratchdir", scratchDir);
+    conf.set("fs.permissions.umask-mode", "022");
+    conf.set("javax.jdo.option.ConnectionURL",
+        "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true");
+    conf.set("hive.metastore.local", "true");
+    conf.set("hive.aux.jars.path", "");
+    conf.set("hive.added.jars.path", "");
+    conf.set("hive.added.files.path", "");
+    conf.set("hive.added.archives.path", "");
+    conf.set("fs.default.name", "file:///");
+
+    // clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)
+    Field field = Configuration.class.getDeclaredField("properties");
+    field.setAccessible(true);
+    Properties props = (Properties) field.get(conf);
+    props.remove("mapred.job.tracker");
+    props.remove("mapreduce.framework.name");
+    props.setProperty("fs.default.name", "file:///");
+
+    // intercept SessionState to clean the threadlocal
+    Field tss = SessionState.class.getDeclaredField("tss");
+    tss.setAccessible(true);
+    return new HiveConf(conf);
+  }
+
+  public void stop() {
+    if (hiveServer != null) {
+      log.info("Stopping Hive Local/Embedded Server...");
+      hiveServer.stop();
+      hiveServer = null;
+      config = null;
+      log.info("Hive Local/Embedded Server Stopped SucessFully...");
+
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68cafe58/integration/hive/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/resources/data.csv b/integration/hive/src/main/resources/data.csv
new file mode 100644
index 0000000..34f96b6
--- /dev/null
+++ b/integration/hive/src/main/resources/data.csv
@@ -0,0 +1,3 @@
+ID,NAME,SALARY
+1,'liang',200000
+2,'anubhav',20000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68cafe58/integration/hive/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/resources/log4j.properties b/integration/hive/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e369916
--- /dev/null
+++ b/integration/hive/src/main/resources/log4j.properties
@@ -0,0 +1,11 @@
+# Root logger option
+log4j.rootLogger=INFO,stdout
+
+
+# Redirect log messages to console
+log4j.appender.debug=org.apache.log4j.RollingFileAppender
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68cafe58/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
new file mode 100644
index 0000000..158cfff
--- /dev/null
+++ b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hiveexample
+
+import java.io.File
+import java.sql.{DriverManager, ResultSet, SQLException, Statement}
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.hive.server.HiveEmbeddedServer2
+
+object HiveExample {
+
+  private val driverName: String = "org.apache.hive.jdbc.HiveDriver"
+
+  /**
+   * @param args
+   * @throws SQLException
+   */
+  @throws[SQLException]
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val store = s"$rootPath/integration/hive/target/store"
+    val warehouse = s"$rootPath/integration/hive/target/warehouse"
+    val metaStore_Db = s"$rootPath/integration/hive/target/carbon_metaStore_db"
+    val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+    import org.apache.spark.sql.CarbonSession._
+
+    System.setProperty("hadoop.home.dir", "/")
+
+    val carbon = SparkSession
+      .builder()
+      .master("local")
+      .appName("HiveExample")
+      .config("carbon.sql.warehouse.dir", warehouse).enableHiveSupport()
+      .getOrCreateCarbonSession(
+        store, metaStore_Db)
+
+    val carbonHadoopJarPath = s"$rootPath/assembly/target/scala-2.11/carbondata_2.11-1.1" +
+                              ".0-incubating-SNAPSHOT-shade-hadoop2.7.2.jar"
+
+    val carbon_DefaultHadoopVersion_JarPath =
+      s"$rootPath/assembly/target/scala-2.11/carbondata_2.11-1.1" +
+      ".0-incubating-SNAPSHOT-shade-hadoop2.2.0.jar"
+
+    val hiveJarPath = s"$rootPath/integration/hive/target/carbondata-hive-1.1" +
+                      ".0-incubating-SNAPSHOT.jar"
+
+    carbon.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
+
+    carbon
+      .sql(
+        """CREATE TABLE HIVE_CARBON_EXAMPLE (ID int,NAME string,SALARY double) STORED BY
+          |'CARBONDATA' """
+          .stripMargin)
+
+    carbon.sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$rootPath/integration/hive/src/main/resources/data.csv' INTO
+           TABLE
+         HIVE_CARBON_EXAMPLE
+           """)
+    carbon.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
+
+    carbon.stop()
+
+    try {
+      Class.forName(driverName)
+    }
+    catch {
+      case classNotFoundException: ClassNotFoundException =>
+        classNotFoundException.printStackTrace()
+    }
+
+    val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
+    hiveEmbeddedServer2.start()
+    val port = hiveEmbeddedServer2.getFreePort
+    val con = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
+    val stmt: Statement = con.createStatement
+
+    logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")
+
+    try {
+      stmt
+        .execute(s"ADD JAR $carbonHadoopJarPath")
+    }
+    catch {
+      case exception: Exception =>
+        logger.warn(s"Jar Not Found $carbonHadoopJarPath"+"Looking For hadoop 2.2.0 version jar")
+        try {
+          stmt
+            .execute(s"ADD JAR $carbon_DefaultHadoopVersion_JarPath")
+        }
+        catch {
+          case exception: Exception => logger
+            .error(s"Exception Occurs:Neither One of Jar is Found $carbon_DefaultHadoopVersion_JarPath,$carbonHadoopJarPath"+"Atleast One Should Be Build")
+            hiveEmbeddedServer2.stop()
+            System.exit(0)
+        }
+    }
+    try {
+      stmt
+        .execute(s"ADD JAR $hiveJarPath")
+    }
+    catch {
+      case exception: Exception => logger.error(s"Exception Occurs:Jar Not Found $hiveJarPath")
+        hiveEmbeddedServer2.stop()
+        System.exit(0)
+
+    }
+    stmt.execute("set hive.mapred.supports.subdirectories=true")
+    stmt.execute("set mapreduce.input.fileinputformat.input.dir.recursive=true")
+
+
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + "HIVE_CARBON_EXAMPLE " +
+                 " (ID int, NAME string,SALARY double)")
+    stmt
+      .execute(
+        "ALTER TABLE HIVE_CARBON_EXAMPLE SET FILEFORMAT INPUTFORMAT \"org.apache.carbondata." +
+        "hive.MapredCarbonInputFormat\"OUTPUTFORMAT \"org.apache.carbondata.hive." +
+        "MapredCarbonOutputFormat\"SERDE \"org.apache.carbondata.hive." +
+        "CarbonHiveSerDe\" ")
+
+    stmt
+      .execute(
+        "ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
+        s"'file:///$store/default/hive_carbon_example' ")
+
+
+    val sql = "SELECT * FROM HIVE_CARBON_EXAMPLE"
+
+    val res: ResultSet = stmt.executeQuery(sql)
+
+    var rowsFetched = 0
+
+    while (res.next) {
+      if (rowsFetched == 0) {
+        println("+---+" + "+-------+" + "+--------------+")
+        println("| ID|" + "| NAME |" + "| SALARY        |")
+
+        println("+---+" + "+-------+" + "+--------------+")
+
+        val resultId = res.getString("id")
+        val resultName = res.getString("name")
+        val resultSalary = res.getString("salary")
+
+        println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary  |")
+        println("+---+" + "+-------+" + "+--------------+")
+      }
+      else {
+        val resultId = res.getString("ID")
+        val resultName = res.getString("NAME")
+        val resultSalary = res.getString("SALARY")
+
+        println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary   |")
+        println("+---+" + "+-------+" + "+--------------+")
+      }
+      rowsFetched = rowsFetched + 1
+    }
+    println(s"******Total Number Of Rows Fetched ****** $rowsFetched")
+    hiveEmbeddedServer2.stop()
+    System.exit(0)
+  }
+
+}
\ No newline at end of file