You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Stanislav Lukyanov <st...@gmail.com> on 2018/10/12 17:13:19 UTC

RE: unable to create indexing on dataframe

Hi,

If you want help, you have to explain what is your problem.
Sharing code is great, but people also need to understand what exactly you’re trying to do
and what to look for.
Also, your code seems to contain private classes (com.inn.*) so no one will be able to run it anyway.

Stan

From: shrutika modi
Sent: 4 сентября 2018 г. 13:25
To: user@ignite.apache.org
Subject: unable to create indexing on dataframe

package com.test;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spark.IgniteContext;
import org.apache.ignite.spark.IgniteDataFrameSettings;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.ignite.IgniteSparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.inn.sparkrunner.context.JobContext;
import com.inn.sparkrunner.context.JobContextImpl;
import com.inn.sparkrunner.createsparkcontext.CreateSparkContext;
import com.inn.sparkrunner.util.ConfigUtil;

public class JoinUsingIgnite {

	public static void main(String[] args) throws SecurityException, Exception
{
		Logger logger = LoggerFactory.getLogger(JoinUsingIgnite.class);

		ConfigUtil.setConfig();
		System.setProperty("IGNITE_HOME",
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin");
		
		JobContext jobContext = null;
		SparkSession sparkSession = null;
		sparkSession = CreateSparkContext.create(args);
		jobContext = new JobContextImpl(sparkSession);
		TcpDiscoverySpi spi = new TcpDiscoverySpi();
		TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
		ipFinder.setAddresses(java.util.Arrays.asList("sf3.start.com",
"sf2.start.com"));
		spi.setIpFinder(ipFinder);

		IgniteConfiguration cfg = new IgniteConfiguration();
		// cfg.setIgniteInstanceName("grid");
		cfg.setDiscoverySpi(spi);
		
		Ignite ignite = Ignition.start(cfg);
		IgniteContext igniteContext = new
IgniteContext(sparkSession.sparkContext(),
			
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml");

		IgniteSparkSession igniteSparkSession = new
IgniteSparkSession(igniteContext, sparkSession);

		Properties connectionproperties = new Properties();

		connectionproperties.put("user", "FORESIGHT_PRODUCT");
		connectionproperties.put("password", "FORESIGHT_PRODUCT@#^249");
		connectionproperties.setProperty("Driver", "com.mysql.jdbc.Driver");

		logger.info("spark start------");

		Dataset<Row> jdbc = igniteSparkSession.read().option("inferschema",
"false").jdbc(
				"jdbc:mysql://192.168.4.249:3306/FORESIGHT", "(SELECT * FROM
NetworkElement) emp",
				connectionproperties);
		

		Dataset<Row> jdbc1 = igniteSparkSession.read().option("inferschema",
"false").jdbc(
				"jdbc:mysql://192.168.4.249:3306/FORESIGHT", "(SELECT * FROM
MacroSiteDetail) emp1",
				connectionproperties);

		
		CacheConfiguration setSqlSchema = new CacheConfiguration("cachecfg")
				.setSqlSchema("PUBLIC");

		IgniteCache cache = ignite.getOrCreateCache(setSqlSchema);
	
	
		CacheConfiguration setSqlSchema2 = new CacheConfiguration("table2")
				.setSqlSchema("PUBLIC");
		IgniteCache cache2 = ignite.getOrCreateCache(setSqlSchema2);
			
		creatingTable(jdbc, cache, "ignitetab1", "networkelementid_pk");
		SqlFieldsQuery creatingInsertingCommand = creatingInsertingCommand(jdbc,
cache, "ignitetab1");
		InsertingData(jdbc, cache, creatingInsertingCommand);
		
		creatingTable(jdbc1, cache2, "ignitetab2", "macrositedetailid_pk");
		SqlFieldsQuery creatingInsertingCommand1 = creatingInsertingCommand(jdbc1,
cache2, "ignitetab2");
		InsertingData(jdbc1, cache2, creatingInsertingCommand1);

		Dataset<Row> df1 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
				.option(IgniteDataFrameSettings.OPTION_TABLE(), "ignitetab1")
				.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
					
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")

				.load().repartition(1);
		df1.createOrReplaceTempView("df1");
		Dataset<Row> df2 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
				.option(IgniteDataFrameSettings.OPTION_TABLE(), "ignitetab2")
				.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
					
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")

				.load().repartition(1);
		
		
		
		df2.createOrReplaceTempView("df2");
		
		logger.info("join query");
		Dataset<Row> ignite1 = igniteSparkSession.sql(
				"select * from df1 join df2 on
df1.networkelementid_pk=df2.macrositedetailid_pk");

		
logger.info("join query end");
		logger.info("ignite dataframe count------[{}]",ignite1.count());

		igniteSparkSession.close();
		Ignition.stop(true);
	}

	
	private static void creatingTable(Dataset<Row> employee, IgniteCache cache,
String tempTable, String index) {

		String query = gettingCreateQuery(employee);

		String str = "CREATE TABLE " + tempTable + " (" + query + ") WITH
\"template=partitioned\"";

		System.out.println("create query--->" + str);

		cache.query(new SqlFieldsQuery(str).setSchema("PUBLIC")).getAll();

		cache.query(new SqlFieldsQuery("CREATE INDEX on " + tempTable + " (" +
index + ")")).getAll();

	}

	private static String gettingCreateQuery(Dataset<Row> employee) {

		String str = "";
		StructField[] fields = employee.schema().fields();
		for (int i = 0; i < fields.length; i++) {
			String datatype =
fields[i].dataType().typeName().equalsIgnoreCase("string") ? "VARCHAR"
					: fields[i].dataType().typeName();
			if (i == 0) {
				str = str + fields[i].name() + " " + datatype + " PRIMARY KEY, ";
			} else if (i == fields.length - 1) {
				str = str + fields[i].name() + " " + datatype;
			} else {
				str = str + fields[i].name() + " " + datatype + " ,";
			}
		}

		return str;
	}

	private static void InsertingData(Dataset<Row> employee, IgniteCache cache,
SqlFieldsQuery employeeinsert) {

		List<Row> collectAsList = employee.collectAsList();

		for (int i = 0; i < collectAsList.size(); i++) {
			Row row = collectAsList.get(i);
			Object[] objectarray = new Object[row.size()];

			for (int j = 0; j < objectarray.length; j++) {
				objectarray[j] = row.get(j);
			}

		
cache.query(employeeinsert.setArgs(objectarray).setSchema("PUBLIC")).getAll();

		}

	}

	private static SqlFieldsQuery creatingInsertingCommand(Dataset<Row>
employee, IgniteCache cache, String tempTable) {

		String query = gettingInsertQuery(employee);

		String fields = Arrays.toString(employee.columns()).replaceAll("\\[",
"").replaceAll("\\]", "");

		String str = "INSERT INTO " + tempTable + " (" + fields + ")  VALUES (" +
query + ")";

		SqlFieldsQuery city = new SqlFieldsQuery(str);
		System.out.println("str-------------->" + str);
		return city;
	}

	private static String gettingInsertQuery(Dataset<Row> employee) {

		String str = "";

		for (int i = 0; i < employee.columns().length; i++) {
			if (i != employee.columns().length - 1) {
				str = str + "?, ";
			} else {
				str = str + "?";
			}

		}

		return str;
	}
}




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/