You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/06/01 17:41:00 UTC
[11/32] incubator-streams git commit: hive generation is looking
pretty good
hive generation is looking pretty good
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9618adaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9618adaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9618adaf
Branch: refs/feature/STREAMS-389,398
Commit: 9618adaf3f828f9f3682226f666e5732824e4212
Parents: bf31cbe
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue May 3 16:32:58 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue May 3 16:32:58 2016 -0500
----------------------------------------------------------------------
streams-plugins/streams-plugin-hive/pom.xml | 23 +-
.../plugins/StreamsHiveResourceGenerator.java | 221 -------------
.../StreamsHiveResourceGeneratorMojo.java | 68 ----
.../hive/StreamsHiveGenerationConfig.java | 84 +++++
.../hive/StreamsHiveResourceGenerator.java | 322 +++++++++++++++++++
.../hive/StreamsHiveResourceGeneratorMojo.java | 76 +++++
.../test/StreamsHiveResourceGeneratorTest.java | 88 ++++-
.../src/test/resources/expected/activity.hql | 203 ++++++++++++
.../src/test/resources/expected/collection.hql | 47 +++
.../src/test/resources/expected/media_link.hql | 11 +
.../src/test/resources/expected/object.hql | 61 ++++
.../resources/expected/objectTypes/place.hql | 79 +++++
.../test/resources/expected/verbs/purchase.hql | 203 ++++++++++++
streams-schemas/pom.xml | 37 ++-
.../org/apache/streams/schema/FieldType.java | 13 +
.../org/apache/streams/schema/FieldUtil.java | 29 ++
.../org/apache/streams/schema/FileUtil.java | 69 ++++
.../apache/streams/schema/GenerationConfig.java | 115 +++++++
.../java/org/apache/streams/schema/Schema.java | 57 ++++
.../org/apache/streams/schema/SchemaStore.java | 283 ++++++++++++++++
.../org/apache/streams/schema/SchemaUtil.java | 50 +++
.../java/org/apache/streams/schema/URIUtil.java | 31 ++
22 files changed, 1860 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/pom.xml b/streams-plugins/streams-plugin-hive/pom.xml
index 22d75ce..b173a8d 100644
--- a/streams-plugins/streams-plugin-hive/pom.xml
+++ b/streams-plugins/streams-plugin-hive/pom.xml
@@ -40,6 +40,12 @@
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -47,15 +53,13 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-pojo</artifactId>
+ <artifactId>streams-schemas</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
@@ -219,9 +223,10 @@
<goal>unpack-dependencies</goal>
</goals>
<configuration>
- <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includeGroupIds>org.apache.streams</includeGroupIds>
+ <includeArtifactIds>streams-schemas</includeArtifactIds>
<includes>**/*.json</includes>
- <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ <outputDirectory>${project.build.directory}/test-classes/streams-schemas</outputDirectory>
</configuration>
</execution>
</executions>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
deleted file mode 100644
index 1efb15e..0000000
--- a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
+++ /dev/null
@@ -1,221 +0,0 @@
-package org.apache.streams.plugins;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.reflections.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Created by sblackmon on 11/18/15.
- */
-public class StreamsHiveResourceGenerator implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGenerator.class);
-
- private final static String LS = System.getProperty("line.separator");
-
- private StreamsHiveResourceGeneratorMojo mojo;
-
- String inDir = "./target/test-classes/activities";
- String outDir = "./target/generated-sources/hive";
-
- public void main(String[] args) {
- StreamsHiveResourceGenerator streamsHiveResourceGenerator = new StreamsHiveResourceGenerator();
- Thread thread = new Thread(streamsHiveResourceGenerator);
- thread.start();
- try {
- thread.join();
- } catch (InterruptedException e) {
- LOGGER.error("InterruptedException", e);
- } catch (Exception e) {
- LOGGER.error("Exception", e);
- }
- return;
- }
-
- public StreamsHiveResourceGenerator(StreamsHiveResourceGeneratorMojo mojo) {
- this.mojo = mojo;
- if ( mojo != null &&
- mojo.getTarget() != null &&
- !Strings.isNullOrEmpty(mojo.getTarget().getAbsolutePath())
- )
- outDir = mojo.getTarget().getAbsolutePath();
-
- if ( mojo != null &&
- mojo.getPackages() != null &&
- mojo.getPackages().length > 0
- )
- packages = mojo.getPackages();
- }
-
- public StreamsHiveResourceGenerator() {
- }
-
- public void run() {
-
- List<File> schemaFiles;
-
-
- List<Class<?>> serializableClasses = detectSerializableClasses();
-
- LOGGER.info("Detected {} serialiables:", serializableClasses.size());
- for( Class clazz : serializableClasses )
- LOGGER.debug(clazz.toString());
-
- List<Class<?>> pojoClasses = detectPojoClasses(serializableClasses);
-
- LOGGER.info("Detected {} pojos:", pojoClasses.size());
- for( Class clazz : pojoClasses ) {
- LOGGER.debug(clazz.toString());
-
- }
-
-
- for( Class clazz : pojoClasses ) {
- String pojoPath = clazz.getPackage().getName().replace(".pojo.json", ".hive").replace(".","/")+"/";
- String pojoName = clazz.getSimpleName()+".hql";
- String pojoHive = renderPojo(clazz);
- writeFile(outDir+"/"+pojoPath+pojoName, pojoHive);
- }
-
- }
-
- private void writeFile(String pojoFile, String pojoHive) {
- try {
- File path = new File(pojoFile);
- File dir = path.getParentFile();
- if( !dir.exists() )
- dir.mkdirs();
- Files.write(Paths.get(pojoFile), pojoHive.getBytes(), StandardOpenOption.CREATE_NEW);
- } catch (Exception e) {
- LOGGER.error("Write Exception: {}", e);
- }
- }
-
- public List<Class<?>> detectSerializableClasses() {
-
- Set<Class<? extends Serializable>> classes =
- reflections.getSubTypesOf(java.io.Serializable.class);
-
- List<Class<?>> result = Lists.newArrayList();
-
- for( Class clazz : classes ) {
- result.add(clazz);
- }
-
- return result;
- }
-
- public List<Class<?>> detectPojoClasses(List<Class<?>> classes) {
-
- List<Class<?>> result = Lists.newArrayList();
-
- for( Class clazz : classes ) {
- try {
- clazz.newInstance().toString();
- } catch( Exception e) {}
- // super-halfass way to know if this is a jsonschema2pojo
- if( clazz.getAnnotations().length >= 1 )
- result.add(clazz);
- }
-
- return result;
- }
-
- public String renderPojo(Class<?> pojoClass) {
- StringBuffer stringBuffer = new StringBuffer();
- stringBuffer.append("CREATE TABLE ");
- stringBuffer.append(pojoClass.getPackage().getName().replace(".pojo.json", ".hive"));
- stringBuffer.append(LS);
- stringBuffer.append("(");
- stringBuffer.append(LS);
-
- Set<Field> fields = ReflectionUtils.getAllFields(pojoClass);
- appendFields(stringBuffer, fields, "", ",");
-
- stringBuffer.append(")");
-
- return stringBuffer.toString();
- }
-
- private void appendFields(StringBuffer stringBuffer, Set<Field> fields, String varDef, String fieldDelimiter) {
- if( fields.size() > 0 ) {
- stringBuffer.append(LS);
- Map<String,Field> fieldsToAppend = uniqueFields(fields);
- for( Iterator<Field> iter = fieldsToAppend.values().iterator(); iter.hasNext(); ) {
- Field field = iter.next();
- stringBuffer.append(name(field));
- stringBuffer.append(": ");
- stringBuffer.append(type(field));
- if( iter.hasNext()) stringBuffer.append(fieldDelimiter);
- stringBuffer.append(LS);
- }
- } else {
- stringBuffer.append(LS);
- }
- }
-
- private String value(Field field) {
- if( field.getName().equals("verb")) {
- return "\"post\"";
- } else if( field.getName().equals("objectType")) {
- return "\"application\"";
- } else return null;
- }
-
- private String type(Field field) {
- if( field.getType().equals(java.lang.String.class)) {
- return "STRING";
- } else if( field.getType().equals(java.lang.Integer.class)) {
- return "INT";
- } else if( field.getType().equals(org.joda.time.DateTime.class)) {
- return "DATE";
- }else if( field.getType().equals(java.util.Map.class)) {
- return "MAP";
- } else if( field.getType().equals(java.util.List.class)) {
- return "ARRAY";
- }
- return field.getType().getCanonicalName().replace(".pojo.json", ".scala");
- }
-
- private Map<String,Field> uniqueFields(Set<Field> fieldset) {
- Map<String,Field> fields = Maps.newTreeMap();
- Field item = null;
- for( Iterator<Field> it = fieldset.iterator(); it.hasNext(); item = it.next() ) {
- if( item != null && item.getName() != null ) {
- Field added = fields.put(item.getName(), item);
- }
- // ensure right class will get used
- }
- return fields;
- }
-
- private String name(Field field) {
- if( field.getName().equals("object"))
- return "obj";
- else return field.getName();
- }
-
- private boolean override(Field field) {
- try {
- if( field.getDeclaringClass().getSuperclass().getField(field.getName()) != null )
- return true;
- else return false;
- } catch( Exception e ) {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
deleted file mode 100644
index 1f8c782..0000000
--- a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.streams.plugins;
-
-import org.apache.maven.plugin.AbstractMojo;
-import org.apache.maven.plugin.MojoExecutionException;
-import org.apache.maven.plugins.annotations.Component;
-import org.apache.maven.plugins.annotations.Execute;
-import org.apache.maven.plugins.annotations.LifecyclePhase;
-import org.apache.maven.plugins.annotations.Mojo;
-import org.apache.maven.plugins.annotations.Parameter;
-import org.apache.maven.project.MavenProject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-@Mojo( name = "hive",
- defaultPhase = LifecyclePhase.GENERATE_SOURCES
-)
-@Execute( goal = "hive",
- phase = LifecyclePhase.GENERATE_SOURCES
-)
-public class StreamsHiveResourceGeneratorMojo extends AbstractMojo {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGeneratorMojo.class);
-
- @Component
- private MavenProject project;
-
-// @Component
-// private Settings settings;
-//
-// @Parameter( defaultValue = "${localRepository}", readonly = true, required = true )
-// protected ArtifactRepository localRepository;
-//
-// @Parameter( defaultValue = "${plugin}", readonly = true ) // Maven 3 only
-// private PluginDescriptor plugin;
-//
- @Parameter( defaultValue = "${project.basedir}", readonly = true )
- private File basedir;
-
- @Parameter(defaultValue = "${project.build.directory}", readonly = true)
- private File target;
-
- @Parameter(defaultValue = "org.apache.streams.pojo.json", readonly = true)
- private String[] packages;
-
- public void execute() throws MojoExecutionException {
- StreamsHiveResourceGenerator streamsPojoScala = new StreamsHiveResourceGenerator(this);
- Thread thread = new Thread(streamsPojoScala);
- thread.start();
- try {
- thread.join();
- } catch (InterruptedException e) {
- LOGGER.error("InterruptedException", e);
- } catch (Exception e) {
- LOGGER.error("Exception", e);
- }
- return;
- }
-
- public File getTarget() {
- return target;
- }
-
- public String[] getPackages() {
- return packages;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
new file mode 100644
index 0000000..7e3bf35
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
@@ -0,0 +1,84 @@
+package org.apache.streams.plugins.hive;
+
+import org.apache.streams.schema.GenerationConfig;
+import org.jsonschema2pojo.DefaultGenerationConfig;
+import org.jsonschema2pojo.util.URLUtil;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Configures StreamsHiveResourceGenerator
+ *
+ *
+ */
+public class StreamsHiveGenerationConfig extends DefaultGenerationConfig implements GenerationConfig {
+
+ public String getSourceDirectory() {
+ return sourceDirectory;
+ }
+
+ public List<String> getSourcePaths() {
+ return sourcePaths;
+ }
+
+ private String sourceDirectory;
+ private List<String> sourcePaths = new ArrayList<String>();
+ private String targetDirectory;
+ private int maxDepth = 1;
+
+ public Set<String> getExclusions() {
+ return exclusions;
+ }
+
+ public void setExclusions(Set<String> exclusions) {
+ this.exclusions = exclusions;
+ }
+
+ private Set<String> exclusions = new HashSet<String>();
+
+ public int getMaxDepth() {
+ return maxDepth;
+ }
+
+ public void setSourceDirectory(String sourceDirectory) {
+ this.sourceDirectory = sourceDirectory;
+ }
+
+ public void setSourcePaths(List<String> sourcePaths) {
+ this.sourcePaths = sourcePaths;
+ }
+
+ public void setTargetDirectory(String targetDirectory) {
+ this.targetDirectory = targetDirectory;
+ }
+
+ @Override
+ public File getTargetDirectory() {
+ return new File(targetDirectory);
+ }
+
+ @Override
+ public Iterator<URL> getSource() {
+ if (null != sourceDirectory) {
+ return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
+ }
+ List<URL> sourceURLs = new ArrayList<URL>();
+ if( sourcePaths != null && sourcePaths.size() > 0)
+ for (String source : sourcePaths) {
+ sourceURLs.add(URLUtil.parseURL(source));
+ }
+ return sourceURLs.iterator();
+ }
+
+ public void setMaxDepth(int maxDepth) {
+ this.maxDepth = maxDepth;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
new file mode 100644
index 0000000..06c1499
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
@@ -0,0 +1,322 @@
+package org.apache.streams.plugins.hive;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.streams.schema.FieldType;
+import org.apache.streams.schema.FieldUtil;
+import org.apache.streams.schema.FileUtil;
+import org.apache.streams.schema.GenerationConfig;
+import org.apache.streams.schema.Schema;
+import org.apache.streams.schema.SchemaStore;
+import org.apache.streams.schema.SchemaUtil;
+import org.apache.streams.schema.URIUtil;
+import org.jsonschema2pojo.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.defaultString;
+import static org.apache.streams.schema.FileUtil.*;
+
+/**
+ * Generates hive table definitions for using org.openx.data.jsonserde.JsonSerDe on new-line delimited json documents.
+ *
+ *
+ */
+public class StreamsHiveResourceGenerator implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGenerator.class);
+
+ private final static String LS = System.getProperty("line.separator");
+
+ private StreamsHiveGenerationConfig config;
+
+ private SchemaStore schemaStore = new SchemaStore();
+
+ private int currentDepth = 0;
+
+ public void main(String[] args) {
+ StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig();
+
+ String sourceDirectory = "./target/test-classes/activities";
+ String targetDirectory = "./target/generated-sources/streams-plugin-hive";
+ String targetPackage = "";
+
+ if( args.length > 0 )
+ sourceDirectory = args[0];
+ if( args.length > 1 )
+ targetDirectory = args[1];
+
+ config.setSourceDirectory(sourceDirectory);
+ config.setTargetDirectory(targetDirectory);
+
+ StreamsHiveResourceGenerator streamsPojoSourceGenerator = new StreamsHiveResourceGenerator(config);
+ Thread thread = new Thread(streamsPojoSourceGenerator);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("InterruptedException", e);
+ } catch (Exception e) {
+ LOGGER.error("Exception", e);
+ }
+ return;
+ }
+
+ public StreamsHiveResourceGenerator(StreamsHiveGenerationConfig config) {
+ this.config = config;
+ }
+
+ public void run() {
+
+ checkNotNull(config);
+
+ generate(config);
+
+ }
+
+ public void generate(StreamsHiveGenerationConfig config) {
+
+ LinkedList<File> sourceFiles = new LinkedList<File>();
+
+ for (Iterator<URL> sources = config.getSource(); sources.hasNext();) {
+ URL source = sources.next();
+ sourceFiles.add(URLUtil.getFileFromURL(source));
+ }
+
+ LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
+
+ FileUtil.resolveRecursive((GenerationConfig)config, sourceFiles);
+
+ LOGGER.info("Resolved {} schema files:", sourceFiles.size());
+
+ for (Iterator<File> iterator = sourceFiles.iterator(); iterator.hasNext();) {
+ File item = iterator.next();
+ schemaStore.create(item.toURI());
+ }
+
+ LOGGER.info("Identified {} objects:", schemaStore.getSize());
+
+ for (Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator(); schemaIterator.hasNext(); ) {
+ Schema schema = schemaIterator.next();
+ currentDepth = 0;
+ if( schema.getURI().getScheme().equals("file")) {
+ String inputFile = schema.getURI().getPath();
+ String resourcePath = dropSourcePathPrefix(inputFile, config.getSourceDirectory());
+ for (String sourcePath : config.getSourcePaths()) {
+ resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
+ }
+ String outputFile = config.getTargetDirectory() + "/" + swapExtension(resourcePath, "json", "hql");
+
+ LOGGER.info("Processing {}:", resourcePath);
+
+ String resourceId = dropExtension(resourcePath).replace("/", "_");
+
+ String resourceContent = generateResource(schema, resourceId);
+
+ writeFile(outputFile, resourceContent);
+
+ LOGGER.info("Wrote {}:", outputFile);
+ }
+ }
+ }
+
+ public String generateResource(Schema schema, String resourceId) {
+ StringBuilder resourceBuilder = new StringBuilder();
+ resourceBuilder.append("CREATE TABLE ");
+ resourceBuilder.append(hqlEscape(resourceId));
+ resourceBuilder.append(LS);
+ resourceBuilder.append("(");
+ resourceBuilder.append(LS);
+ resourceBuilder = appendRootObject(resourceBuilder, schema, resourceId, ' ');
+ resourceBuilder.append(")");
+ resourceBuilder.append(LS);
+ resourceBuilder.append("ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'");
+ resourceBuilder.append(LS);
+ resourceBuilder.append("WITH SERDEPROPERTIES (\"ignore.malformed.json\" = \"true\"");
+ resourceBuilder.append(LS);
+ resourceBuilder.append("STORED AS TEXTFILE");
+ resourceBuilder.append(LS);
+ resourceBuilder.append("LOCATION '${hiveconf:path}';");
+ resourceBuilder.append(LS);
+ return resourceBuilder.toString();
+ }
+
+ public StringBuilder appendRootObject(StringBuilder builder, Schema schema, String resourceId, Character seperator) {
+ ObjectNode propertiesNode = schemaStore.resolveProperties(schema, null, resourceId);
+ if( propertiesNode != null && propertiesNode.isObject() && propertiesNode.size() > 0) {
+ builder = appendPropertiesNode(builder, schema, propertiesNode, seperator);
+ }
+ return builder;
+ }
+
+ private StringBuilder appendValueField(StringBuilder builder, Schema schema, String fieldId, FieldType fieldType, Character seperator) {
+ // safe to append nothing
+ checkNotNull(builder);
+ builder.append(hqlEscape(fieldId));
+ builder.append(seperator);
+ builder.append(hqlType(fieldType));
+ return builder;
+ }
+
+ public StringBuilder appendArrayItems(StringBuilder builder, Schema schema, String fieldId, ObjectNode itemsNode, Character seperator) {
+ // not safe to append nothing
+ checkNotNull(builder);
+ if( itemsNode == null ) return builder;
+ if( itemsNode.has("type")) {
+ try {
+ FieldType itemType = FieldUtil.determineFieldType(itemsNode);
+ switch( itemType ) {
+ case OBJECT:
+ builder = appendArrayObject(builder, schema, fieldId, itemsNode, seperator);
+ break;
+ case ARRAY:
+ ObjectNode subArrayItems = (ObjectNode) itemsNode.get("items");
+ builder = appendArrayItems(builder, schema, fieldId, subArrayItems, seperator);
+ break;
+ default:
+ builder = appendArrayField(builder, schema, fieldId, itemType, seperator);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("No item type resolvable for {}", fieldId);
+ }
+ }
+ checkNotNull(builder);
+ return builder;
+ }
+
+ private StringBuilder appendArrayField(StringBuilder builder, Schema schema, String fieldId, FieldType fieldType, Character seperator) {
+ // safe to append nothing
+ checkNotNull(builder);
+ checkNotNull(fieldId);
+ builder.append(hqlEscape(fieldId));
+ builder.append(seperator);
+ builder.append("ARRAY<"+hqlType(fieldType)+">");
+ checkNotNull(builder);
+ return builder;
+ }
+
+ private StringBuilder appendArrayObject(StringBuilder builder, Schema schema, String fieldId, ObjectNode fieldNode, Character seperator) {
+ // safe to append nothing
+ checkNotNull(builder);
+ checkNotNull(fieldNode);
+ if( !Strings.isNullOrEmpty(fieldId)) {
+ builder.append(hqlEscape(fieldId));
+ builder.append(seperator);
+ }
+ builder.append("ARRAY");
+ builder.append(LS);
+ builder.append("<");
+ builder.append(LS);
+ ObjectNode propertiesNode = schemaStore.resolveProperties(schema, fieldNode, fieldId);
+ builder = appendStructField(builder, schema, "", propertiesNode, ':');
+ builder.append(">");
+ checkNotNull(builder);
+ return builder;
+ }
+
+ private StringBuilder appendStructField(StringBuilder builder, Schema schema, String fieldId, ObjectNode propertiesNode, Character seperator) {
+ // safe to append nothing
+ checkNotNull(builder);
+ checkNotNull(propertiesNode);
+
+ if( propertiesNode != null && propertiesNode.isObject() && propertiesNode.size() > 0 ) {
+
+ currentDepth += 1;
+
+ if( !Strings.isNullOrEmpty(fieldId)) {
+ builder.append(hqlEscape(fieldId));
+ builder.append(seperator);
+ }
+ builder.append("STRUCT");
+ builder.append(LS);
+ builder.append("<");
+ builder.append(LS);
+
+ builder = appendPropertiesNode(builder, schema, propertiesNode, ':');
+
+ builder.append(">");
+ builder.append(LS);
+
+ currentDepth -= 1;
+
+ }
+ checkNotNull(builder);
+ return builder;
+ }
+
+ private StringBuilder appendPropertiesNode(StringBuilder builder, Schema schema, ObjectNode propertiesNode, Character seperator) {
+ checkNotNull(builder);
+ checkNotNull(propertiesNode);
+ Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
+ Joiner joiner = Joiner.on(","+LS).skipNulls();
+ List<String> fieldStrings = Lists.newArrayList();
+ for( ; fields.hasNext(); ) {
+ Map.Entry<String, JsonNode> field = fields.next();
+ String fieldId = field.getKey();
+ if( !config.getExclusions().contains(fieldId) && field.getValue().isObject()) {
+ ObjectNode fieldNode = (ObjectNode) field.getValue();
+ FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
+ if (fieldType != null ) {
+ switch (fieldType) {
+ case ARRAY:
+ ObjectNode itemsNode = (ObjectNode) fieldNode.get("items");
+ if( currentDepth <= config.getMaxDepth()) {
+ StringBuilder arrayItemsBuilder = appendArrayItems(new StringBuilder(), schema, fieldId, itemsNode, seperator);
+ if( !Strings.isNullOrEmpty(arrayItemsBuilder.toString())) {
+ fieldStrings.add(arrayItemsBuilder.toString());
+ }
+ }
+ break;
+ case OBJECT:
+ ObjectNode childProperties = schemaStore.resolveProperties(schema, fieldNode, fieldId);
+ if( currentDepth < config.getMaxDepth()) {
+ StringBuilder structFieldBuilder = appendStructField(new StringBuilder(), schema, fieldId, childProperties, seperator);
+ if( !Strings.isNullOrEmpty(structFieldBuilder.toString())) {
+ fieldStrings.add(structFieldBuilder.toString());
+ }
+ }
+ break;
+ default:
+ StringBuilder valueFieldBuilder = appendValueField(new StringBuilder(), schema, fieldId, fieldType, seperator);
+ if( !Strings.isNullOrEmpty(valueFieldBuilder.toString())) {
+ fieldStrings.add(valueFieldBuilder.toString());
+ }
+ }
+ }
+ }
+ }
+ builder.append(joiner.join(fieldStrings)).append(LS);
+ Preconditions.checkNotNull(builder);
+ return builder;
+ }
+
+ private static String hqlEscape( String fieldId ) {
+ return "`"+fieldId+"`";
+ }
+
+ private static String hqlType( FieldType fieldType ) {
+ switch( fieldType ) {
+ case INTEGER:
+ return "INT";
+ case NUMBER:
+ return "FLOAT";
+ case OBJECT:
+ return "STRUCT";
+ default:
+ return fieldType.name().toUpperCase();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
new file mode 100644
index 0000000..9cf71a9
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
@@ -0,0 +1,76 @@
+package org.apache.streams.plugins.hive;
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugins.annotations.Component;
+import org.apache.maven.plugins.annotations.Execute;
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.project.MavenProject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+@Mojo( name = "hive",
+ defaultPhase = LifecyclePhase.GENERATE_SOURCES
+)
+@Execute( goal = "hive",
+ phase = LifecyclePhase.GENERATE_SOURCES
+)
+public class StreamsHiveResourceGeneratorMojo extends AbstractMojo {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGeneratorMojo.class);
+
+ @Component
+ private MavenProject project;
+
+// @Component
+// private Settings settings;
+//
+// @Parameter( defaultValue = "${localRepository}", readonly = true, required = true )
+// protected ArtifactRepository localRepository;
+//
+// @Parameter( defaultValue = "${plugin}", readonly = true ) // Maven 3 only
+// private PluginDescriptor plugin;
+//
+ @Parameter( defaultValue = "${project.basedir}", readonly = true )
+ private File basedir;
+
+ @Parameter( defaultValue = "./src/main/jsonschema", readonly = true ) // Maven 3 only
+ public String sourceDirectory;
+
+ @Parameter( readonly = true ) // Maven 3 only
+ public List<String> sourcePaths;
+
+ @Parameter(defaultValue = "./target/generated-sources/streams-plugin-hive", readonly = true)
+ public String targetDirectory;
+
+ public void execute() throws MojoExecutionException {
+
+ //addProjectDependenciesToClasspath();
+
+ StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig();
+
+ if( sourcePaths != null && sourcePaths.size() > 0)
+ config.setSourcePaths(sourcePaths);
+ else
+ config.setSourceDirectory(sourceDirectory);
+ config.setTargetDirectory(targetDirectory);
+
+ StreamsHiveResourceGenerator streamsPojoScala = new StreamsHiveResourceGenerator(config);
+ Thread thread = new Thread(streamsPojoScala);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("InterruptedException", e);
+ } catch (Exception e) {
+ LOGGER.error("Exception", e);
+ }
+ return;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java b/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
index cd80cf8..9be908d 100644
--- a/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
+++ b/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
@@ -1,12 +1,23 @@
package org.apache.streams.plugins.test;
-import org.apache.streams.plugins.StreamsHiveResourceGenerator;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.plugins.hive.StreamsHiveGenerationConfig;
+import org.apache.streams.plugins.hive.StreamsHiveResourceGenerator;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.File;
-import java.io.FileFilter;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.streams.schema.FileUtil.dropSourcePathPrefix;
/**
* Test that Activity beans are compatible with the example activities in the spec.
@@ -21,24 +32,79 @@ public class StreamsHiveResourceGeneratorTest {
* @throws Exception
*/
@Test
- public void testStreamsPojoHive() throws Exception {
- StreamsHiveResourceGenerator streamsHiveResourceGenerator = new StreamsHiveResourceGenerator();
- streamsHiveResourceGenerator.main(new String[0]);
+ public void StreamsHiveResourceGenerator() throws Exception {
+
+ StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig();
+
+ String sourceDirectory = "target/test-classes/streams-schemas";
+
+ config.setSourceDirectory(sourceDirectory);
+
+ config.setTargetDirectory("target/generated-sources/test");
+
+ config.setExclusions(Sets.newHashSet("attachments"));
+
+ config.setMaxDepth(2);
+
+ StreamsHiveResourceGenerator streamsHiveResourceGenerator = new StreamsHiveResourceGenerator(config);
+ Thread thread = new Thread(streamsHiveResourceGenerator);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("InterruptedException", e);
+ } catch (Exception e) {
+ LOGGER.error("Exception", e);
+ }
- File testOutput = new File( "./target/generated-sources/hive/org/apache/streams/hive");
- FileFilter hqlFilter = new FileFilter() {
+ File testOutput = new File( "./target/generated-sources/test");
+ Predicate<File> hqlFilter = new Predicate<File>() {
@Override
- public boolean accept(File pathname) {
- if( pathname.getName().endsWith(".hql") )
+ public boolean apply(@Nullable File file) {
+ if( file.getName().endsWith(".hql") )
return true;
- return false;
+ else return false;
}
};
assert( testOutput != null );
assert( testOutput.exists() == true );
assert( testOutput.isDirectory() == true );
- assert( testOutput.listFiles(hqlFilter).length == 11 );
+
+ Iterable<File> outputIterator = Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
+ .filter(hqlFilter);
+ Collection<File> outputCollection = Lists.newArrayList(outputIterator);
+ assert( outputCollection.size() == 133 );
+
+ String expectedDirectory = "target/test-classes/expected";
+ File testExpected = new File( expectedDirectory );
+
+ Iterable<File> expectedIterator = Files.fileTreeTraverser().breadthFirstTraversal(testExpected)
+ .filter(hqlFilter);
+ Collection<File> expectedCollection = Lists.newArrayList(expectedIterator);
+
+ int fails = 0;
+
+ Iterator<File> iterator = expectedCollection.iterator();
+ while( iterator.hasNext() ) {
+ File objectExpected = iterator.next();
+ String expectedEnd = dropSourcePathPrefix(objectExpected.getAbsolutePath(), expectedDirectory);
+ File objectActual = new File(config.getTargetDirectory() + "/" + expectedEnd);
+ LOGGER.info("Comparing: {} and {}", objectExpected.getAbsolutePath(), objectActual.getAbsolutePath());
+ assert( objectActual.exists());
+ if( FileUtils.contentEquals(objectActual, objectExpected) == true ) {
+ LOGGER.info("Exact Match!");
+ } else {
+ LOGGER.info("No Match!");
+ fails++;
+ }
+ }
+ if( fails > 0 ) {
+ LOGGER.info("Fails: {}", fails);
+ Assert.fail();
+ }
+
+
// assert( new File(testOutput + "/traits").exists() == true );
// assert( new File(testOutput + "/traits").isDirectory() == true );
// assert( new File(testOutput + "/traits").listFiles(scalaFilter) != null );
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql
new file mode 100644
index 0000000..49d0f41
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql
@@ -0,0 +1,203 @@
+CREATE TABLE `activity`
+(
+`id` STRING,
+`actor` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`verb` STRING,
+`object` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`target` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`generator` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`icon` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`provider` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`title` STRING,
+`content` STRING,
+`url` STRING,
+`links` ARRAY<STRING>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
new file mode 100644
index 0000000..94b986f
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
@@ -0,0 +1,47 @@
+CREATE TABLE `collection`
+(
+`url` STRING,
+`totalItems` INT,
+`items` ARRAY
+<
+STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
new file mode 100644
index 0000000..d0afe0f
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
@@ -0,0 +1,11 @@
+CREATE TABLE `media_link`
+(
+`duration` FLOAT,
+`height` INT,
+`width` INT,
+`url` STRING
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql
new file mode 100644
index 0000000..fb44767
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql
@@ -0,0 +1,61 @@
+CREATE TABLE `object`
+(
+`id` STRING,
+`image` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName` STRING,
+`summary` STRING,
+`content` STRING,
+`url` STRING,
+`objectType` STRING,
+`author` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`upstreamDuplicates` ARRAY<STRING>,
+`downstreamDuplicates` ARRAY<STRING>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
new file mode 100644
index 0000000..b861fad
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
@@ -0,0 +1,79 @@
+CREATE TABLE `objectTypes_place`
+(
+`id` STRING,
+`image` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName` STRING,
+`summary` STRING,
+`content` STRING,
+`url` STRING,
+`objectType` STRING,
+`author` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`upstreamDuplicates` ARRAY<STRING>,
+`downstreamDuplicates` ARRAY<STRING>,
+`address` STRUCT
+<
+`post-office-box`:STRING,
+`extended-address`:STRING,
+`street-address`:STRING,
+`locality`:STRING,
+`region`:STRING,
+`postal-code`:STRING,
+`country-name`:STRING
+>
+,
+`position` STRUCT
+<
+`altitude`:FLOAT,
+`latitude`:FLOAT,
+`longitude`:FLOAT
+>
+
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
new file mode 100644
index 0000000..4a05269
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
@@ -0,0 +1,203 @@
+CREATE TABLE `verbs_purchase`
+(
+`id` STRING,
+`actor` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`verb` STRING,
+`object` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`target` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`generator` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`icon` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`provider` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`title` STRING,
+`content` STRING,
+`url` STRING,
+`links` ARRAY<STRING>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/pom.xml
----------------------------------------------------------------------
diff --git a/streams-schemas/pom.xml b/streams-schemas/pom.xml
index d64359a..0625cb6 100644
--- a/streams-schemas/pom.xml
+++ b/streams-schemas/pom.xml
@@ -29,9 +29,44 @@
<artifactId>streams-schemas</artifactId>
<name>${project.artifactId}</name>
- <description>Activity Streams schemas</description>
+ <description>Activity Streams schemas and schema utilities</description>
+ <dependencies>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ </dependencies>
<build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java b/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java
new file mode 100644
index 0000000..1ad9dcc
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java
@@ -0,0 +1,13 @@
+package org.apache.streams.schema;
+
+/**
+ * Created by steve on 5/1/16.
+ */
+public enum FieldType {
+ STRING,
+ INTEGER,
+ NUMBER,
+ BOOLEAN,
+ OBJECT,
+ ARRAY
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java
new file mode 100644
index 0000000..5f83767
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java
@@ -0,0 +1,29 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Created by steve on 5/1/16.
+ */
+public class FieldUtil {
+ public static FieldType determineFieldType(ObjectNode fieldNode) {
+ String typeSchemaField = "type";
+ if( !fieldNode.has(typeSchemaField))
+ return null;
+ String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText();
+ if( typeSchemaFieldValue.equals("string")) {
+ return FieldType.STRING;
+ } else if( typeSchemaFieldValue.equals("integer")) {
+ return FieldType.INTEGER;
+ } else if( typeSchemaFieldValue.equals("number")) {
+ return FieldType.NUMBER;
+ } else if( typeSchemaFieldValue.equals("object")) {
+ return FieldType.OBJECT;
+ } else if( typeSchemaFieldValue.equals("boolean")) {
+ return FieldType.BOOLEAN;
+ } else if( typeSchemaFieldValue.equals("array")) {
+ return FieldType.ARRAY;
+ }
+ else return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java
new file mode 100644
index 0000000..6b171f3
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java
@@ -0,0 +1,69 @@
+package org.apache.streams.schema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by steve on 5/1/16.
+ */
+public class FileUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
+
+ public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) {
+ if(Strings.isNullOrEmpty(sourceDirectory))
+ return inputFile;
+ else if( inputFile.contains(sourceDirectory) ) {
+ return inputFile.substring(inputFile.indexOf(sourceDirectory)+sourceDirectory.length()+1);
+ } else return inputFile;
+ }
+
+ public static String swapExtension(String inputFile, String originalExtension, String newExtension) {
+ if(inputFile.endsWith("."+originalExtension))
+ return inputFile.replace("."+originalExtension, "."+newExtension);
+ else return inputFile;
+ }
+
+ public static String dropExtension(String inputFile) {
+ if(inputFile.contains("."))
+ return inputFile.substring(0, inputFile.lastIndexOf("."));
+ else return inputFile;
+ }
+
+ public static void writeFile(String resourceFile, String resourceContent) {
+ try {
+ File path = new File(resourceFile);
+ File dir = path.getParentFile();
+ if( !dir.exists() )
+ dir.mkdirs();
+ Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW);
+ } catch (Exception e) {
+ LOGGER.error("Write Exception: {}", e);
+ }
+ }
+
+ public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) {
+
+ Preconditions.checkArgument(schemaFiles.size() > 0);
+ int i = 0;
+ while( schemaFiles.size() > i) {
+ File child = schemaFiles.get(i);
+ if (child.isDirectory()) {
+ schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter())));
+ schemaFiles.remove(child);
+ } else {
+ i += 1;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java b/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java
new file mode 100644
index 0000000..e0469c9
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java
@@ -0,0 +1,115 @@
+package org.apache.streams.schema;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.net.URL;
+import java.util.Iterator;
+
+/**
+ * Created by sblackmon on 5/3/16.
+ */
+public interface GenerationConfig {
+
+ /**
+ * Gets the 'source' configuration option.
+ *
+ * @return The source file(s) or directory(ies) from which JSON Schema will
+ * be read.
+ */
+ Iterator<URL> getSource();
+
+ /**
+ * Gets the 'targetDirectory' configuration option.
+ *
+ * @return The target directory into which generated types will be written
+ * (may or may not exist before types are written)
+ */
+ File getTargetDirectory();
+
+ /**
+ * Gets the 'outputEncoding' configuration option.
+ *
+ * @return The character encoding that should be used when writing output files.
+ */
+ String getOutputEncoding();
+
+ /**
+ * Gets the file filter used to isolate the schema mapping files in the
+ * source directories.
+ *
+ * @return the file filter use when scanning for schema files.
+ */
+ FileFilter getFileFilter();
+
+ /**
+ * Gets the 'includeAdditionalProperties' configuration option.
+ *
+ * @return Whether to allow 'additional properties' support in objects.
+ * Setting this to false will disable additional properties support,
+ * regardless of the input schema(s).
+ */
+ boolean isIncludeAdditionalProperties();
+
+ /**
+ * Gets the 'targetVersion' configuration option.
+ *
+ * @return The target version for generated source files.
+ */
+ String getTargetVersion();
+
+// /**
+// * Gets the `includeDynamicAccessors` configuraiton option.
+// *
+// * @return Whether to include dynamic getters, setters, and builders
+// * or to omit these methods.
+// */
+// boolean isIncludeDynamicAccessors();
+
+// /**
+// * Gets the `dateTimeType` configuration option.
+// * <p>
+// * Example values:
+// * <ul>
+// * <li><code>org.joda.time.LocalDateTime</code> (Joda)</li>
+// * <li><code>java.time.LocalDateTime</code> (JSR310)</li>
+// * <li><code>null</code> (default behavior)</li>
+// * </ul>
+// *
+// * @return The java type to use instead of {@link java.util.Date}
+// * when adding date type fields to generate Java types.
+// */
+// String getDateTimeType();
+//
+// /**
+// * Gets the `dateType` configuration option.
+// * <p>
+// * Example values:
+// * <ul>
+// * <li><code>org.joda.time.LocalDate</code> (Joda)</li>
+// * <li><code>java.time.LocalDate</code> (JSR310)</li>
+// * <li><code>null</code> (default behavior)</li>
+// * </ul>
+// *
+// * @return The java type to use instead of string
+// * when adding string type fields with a format of date (not
+// * date-time) to generated Java types.
+// */
+// String getDateType();
+//
+// /**
+// * Gets the `timeType` configuration option.
+// * <p>
+// * Example values:
+// * <ul>
+// * <li><code>org.joda.time.LocalTime</code> (Joda)</li>
+// * <li><code>java.time.LocalTime</code> (JSR310)</li>
+// * <li><code>null</code> (default behavior)</li>
+// * </ul>
+// *
+// * @return The java type to use instead of string
+// * when adding string type fields with a format of time (not
+// * date-time) to generated Java types.
+// */
+// String getTimeType();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java b/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java
new file mode 100644
index 0000000..ea75ffd
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java
@@ -0,0 +1,57 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.net.URI;
+
+/**
+ * A JSON Schema document.
+ */
+public class Schema {
+
+ private final URI id;
+ private final URI uri;
+ private final JsonNode content;
+ private final Schema parent;
+ private final boolean generate;
+
+ public Schema(URI uri, JsonNode content, Schema parent, boolean generate) {
+ this.uri = uri;
+ this.content = content;
+ this.parent = parent;
+ this.generate = generate;
+ this.id = content.has("id") ? URI.create(content.get("id").asText()) : null;
+ }
+
+ public URI getId() {
+ return id;
+ }
+
+ public URI getURI() {
+ return uri;
+ }
+
+ public JsonNode getContent() {
+ return content;
+ }
+
+ public JsonNode getParentContent() {
+ if( parent != null )
+ return parent.getContent();
+ else return null;
+ }
+
+ public URI getParentURI() {
+ if( parent != null ) return parent.getURI();
+ else return null;
+ }
+
+ public boolean isGenerated() {
+ return generate;
+ }
+
+ public Schema getParent() {
+ return parent;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java
new file mode 100644
index 0000000..e612aff
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java
@@ -0,0 +1,283 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.apache.commons.lang3.StringUtils;
+import org.jsonschema2pojo.ContentResolver;
+import org.jsonschema2pojo.FragmentResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.schema.URIUtil.safeResolve;
+
+/**
+ * Created by steve on 4/30/16.
+ */
+public class SchemaStore extends Ordering<Schema> {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SchemaStore.class);
+ private final static JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance;
+
+ protected Map<URI, Schema> schemas = new HashMap();
+ protected FragmentResolver fragmentResolver = new FragmentResolver();
+ protected ContentResolver contentResolver = new ContentResolver();
+
+ public SchemaStore() {
+ }
+
+ public synchronized Schema create(URI uri) {
+ if(!getByUri(uri).isPresent()) {
+ URI baseURI = URIUtil.removeFragment(uri);
+ JsonNode baseNode = this.contentResolver.resolve(baseURI);
+ if(uri.toString().contains("#") && !uri.toString().endsWith("#")) {
+ Schema newSchema = new Schema(baseURI, baseNode, null, true);
+ this.schemas.put(baseURI, newSchema);
+ JsonNode childContent = this.fragmentResolver.resolve(baseNode, '#' + StringUtils.substringAfter(uri.toString(), "#"));
+ this.schemas.put(uri, new Schema(uri, childContent, newSchema, false));
+ } else {
+ if( baseNode.has("extends") && baseNode.get("extends").isObject()) {
+ URI ref = URI.create(((ObjectNode)baseNode.get("extends")).get("$ref").asText());
+ URI absoluteURI;
+ if( ref.isAbsolute())
+ absoluteURI = ref;
+ else
+ absoluteURI = baseURI.resolve(ref);
+ JsonNode parentNode = this.contentResolver.resolve(absoluteURI);
+ Schema parentSchema = null;
+ if( this.schemas.get(absoluteURI) != null ) {
+ parentSchema = this.schemas.get(absoluteURI);
+ } else {
+ parentSchema = create(absoluteURI);
+ }
+ this.schemas.put(uri, new Schema(uri, baseNode, parentSchema, true));
+ } else {
+ this.schemas.put(uri, new Schema(uri, baseNode, null, true));
+ }
+ }
+ List<JsonNode> refs = baseNode.findValues("$ref");
+ for( JsonNode ref : refs ) {
+ if( ref.isValueNode() ) {
+ String refVal = ref.asText();
+ URI refURI = null;
+ try {
+ refURI = URI.create(refVal);
+ } catch( Exception e ) {
+ LOGGER.info("Exception: {}", e.getMessage());
+ }
+ if (refURI != null && !getByUri(refURI).isPresent()) {
+ if (refURI.isAbsolute())
+ create(refURI);
+ else
+ create(baseURI.resolve(refURI));
+ }
+ }
+ }
+ }
+
+ return this.schemas.get(uri);
+ }
+
+ public Schema create(Schema parent, String path) {
+ if(path.equals("#")) {
+ return parent;
+ } else {
+ path = StringUtils.stripEnd(path, "#?&/");
+ URI id = parent != null && parent.getId() != null?parent.getId().resolve(path):URI.create(path);
+ if(this.selfReferenceWithoutParentFile(parent, path)) {
+ this.schemas.put(id, new Schema(id, this.fragmentResolver.resolve(parent.getParentContent(), path), parent, false));
+ return this.schemas.get(id);
+ } else {
+ return this.create(id);
+ }
+ }
+ }
+
+ protected boolean selfReferenceWithoutParentFile(Schema parent, String path) {
+ return parent != null && (parent.getId() == null || parent.getId().toString().startsWith("#/")) && path.startsWith("#/");
+ }
+
+ public synchronized void clearCache() {
+ this.schemas.clear();
+ }
+
+ public Integer getSize() {
+ return schemas.size();
+ }
+
+ public Optional<Schema> getById(URI id) {
+ for( Schema schema : schemas.values() ) {
+ if( schema.getId() != null && schema.getId().equals(id) )
+ return Optional.of(schema);
+ }
+ return Optional.absent();
+ }
+
+ public Optional<Schema> getByUri(URI uri) {
+ for( Schema schema : schemas.values() ) {
+ if( schema.getURI().equals(uri) )
+ return Optional.of(schema);
+ }
+ return Optional.absent();
+ }
+
+ public Integer getFileUriCount() {
+ int count = 0;
+ for( Schema schema : schemas.values() ) {
+ if( schema.getURI().getScheme().equals("file") )
+ count++;
+ }
+ return count;
+ }
+
+ public Integer getHttpUriCount() {
+ int count = 0;
+ for( Schema schema : schemas.values() ) {
+ if( schema.getURI().getScheme().equals("http") )
+ count++;
+ }
+ return count;
+ }
+
+ public Iterator<Schema> getSchemaIterator() {
+ List<Schema> schemaList = Lists.newArrayList(schemas.values());
+ Collections.sort(schemaList, this);
+ return schemaList.iterator();
+ }
+
+ public ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, String resourceId) {
+ // this should return something more suitable like:
+ // Map<String, Pair<Schema, ObjectNode>>
+ ObjectNode schemaProperties = NODE_FACTORY.objectNode();
+ ObjectNode parentProperties = NODE_FACTORY.objectNode();
+ if (fieldNode == null) {
+ ObjectNode schemaContent = (ObjectNode) schema.getContent();
+ if( schemaContent.has("properties") ) {
+ schemaProperties = (ObjectNode) schemaContent.get("properties");
+ if (schema.getParentContent() != null) {
+ ObjectNode parentContent = (ObjectNode) schema.getParentContent();
+ if (parentContent.has("properties")) {
+ parentProperties = (ObjectNode) parentContent.get("properties");
+ }
+ }
+ }
+ } else if (fieldNode != null && fieldNode.size() > 0) {
+ if( fieldNode.has("properties") && fieldNode.get("properties").isObject() && fieldNode.get("properties").size() > 0 )
+ schemaProperties = (ObjectNode) fieldNode.get("properties");
+ URI parentURI = null;
+ if( fieldNode.has("$ref") || fieldNode.has("extends") ) {
+ JsonNode refNode = fieldNode.get("$ref");
+ JsonNode extendsNode = fieldNode.get("extends");
+ if (refNode != null && refNode.isValueNode())
+ parentURI = URI.create(refNode.asText());
+ else if (extendsNode != null && extendsNode.isObject())
+ parentURI = URI.create(extendsNode.get("$ref").asText());
+ ObjectNode parentContent = null;
+ URI absoluteURI;
+ if (parentURI.isAbsolute())
+ absoluteURI = parentURI;
+ else {
+ absoluteURI = schema.getURI().resolve(parentURI);
+ if (!absoluteURI.isAbsolute() || (absoluteURI.isAbsolute() && !getByUri(absoluteURI).isPresent() ))
+ absoluteURI = schema.getParentURI().resolve(parentURI);
+ }
+ if (absoluteURI != null && absoluteURI.isAbsolute()) {
+ if (getByUri(absoluteURI).isPresent())
+ parentContent = (ObjectNode) getByUri(absoluteURI).get().getContent();
+ if (parentContent != null && parentContent.isObject() && parentContent.has("properties")) {
+ parentProperties = (ObjectNode) parentContent.get("properties");
+ } else if (absoluteURI.getPath().endsWith("#properties")) {
+ absoluteURI = URI.create(absoluteURI.toString().replace("#properties", ""));
+ parentProperties = (ObjectNode) getByUri(absoluteURI).get().getContent().get("properties");
+ }
+ }
+ }
+
+
+ }
+
+ ObjectNode resolvedProperties = NODE_FACTORY.objectNode();
+ if (parentProperties != null && parentProperties.size() > 0)
+ resolvedProperties = SchemaUtil.mergeProperties(schemaProperties, parentProperties);
+ else resolvedProperties = schemaProperties.deepCopy();
+
+ return resolvedProperties;
+ }
+
+ @Override
+ public int compare(Schema left, Schema right) {
+ // are they the same?
+ if( left.equals(right)) return 0;
+ // is one an ancestor of the other
+ Schema candidateAncestor = left;
+ while( candidateAncestor.getParent() != null ) {
+ candidateAncestor = candidateAncestor.getParent();
+ if( candidateAncestor.equals(right))
+ return 1;
+ }
+ candidateAncestor = right;
+ while( candidateAncestor.getParent() != null ) {
+ candidateAncestor = candidateAncestor.getParent();
+ if( candidateAncestor.equals(left))
+ return -1;
+ }
+ // does one have a field that reference the other?
+ for( JsonNode refNode : left.getContent().findValues("$ref") ) {
+ String refText = refNode.asText();
+ Optional<URI> resolvedURI = safeResolve(left.getURI(), refText);
+ if( resolvedURI.isPresent() && resolvedURI.get().equals(right.getURI()))
+ return 1;
+ }
+ for( JsonNode refNode : right.getContent().findValues("$ref") ) {
+ String refText = refNode.asText();
+ Optional<URI> resolvedURI = safeResolve(right.getURI(), refText);
+ if( resolvedURI.isPresent() && resolvedURI.get().equals(left.getURI()))
+ return -1;
+ }
+ // does one have a field that reference a third schema that references the other?
+ for( JsonNode refNode : left.getContent().findValues("$ref") ) {
+ String refText = refNode.asText();
+ Optional<URI> possibleConnectorURI = safeResolve(left.getURI(), refText);
+ if( possibleConnectorURI.isPresent()) {
+ Optional<Schema> possibleConnector = getByUri(possibleConnectorURI.get());
+ if (possibleConnector.isPresent()) {
+ for (JsonNode connectorRefNode : possibleConnector.get().getContent().findValues("$ref")) {
+ String connectorRefText = connectorRefNode.asText();
+ Optional<URI> resolvedURI = safeResolve(possibleConnector.get().getURI(), connectorRefText);
+ if (resolvedURI.isPresent() && resolvedURI.get().equals(right.getURI()))
+ return 1;
+ }
+ }
+ }
+ }
+ for( JsonNode refNode : right.getContent().findValues("$ref") ) {
+ String refText = refNode.asText();
+ Optional<URI> possibleConnectorURI = safeResolve(right.getURI(), refText);
+ if( possibleConnectorURI.isPresent()) {
+ Optional<Schema> possibleConnector = getByUri(possibleConnectorURI.get());
+ if (possibleConnector.isPresent()) {
+ for (JsonNode connectorRefNode : possibleConnector.get().getContent().findValues("$ref")) {
+ String connectorRefText = connectorRefNode.asText();
+ Optional<URI> resolvedURI = safeResolve(possibleConnector.get().getURI(), connectorRefText);
+ if (resolvedURI.isPresent() && resolvedURI.get().equals(left.getURI()))
+ return -1;
+ }
+ }
+ }
+ }
+ return 0;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java
new file mode 100644
index 0000000..3e3b300
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java
@@ -0,0 +1,50 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.jsonschema2pojo.util.NameHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+/**
+ * Created by steve on 4/30/16.
+ */
+public class SchemaUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SchemaUtil.class);
+ private static final JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance;
+
+ public static String childQualifiedName(String parentQualifiedName, String childSimpleName) {
+ String safeChildName = childSimpleName.replaceAll(NameHelper.ILLEGAL_CHARACTER_REGEX, "_");
+ return isEmpty(parentQualifiedName) ? safeChildName : parentQualifiedName + "." + safeChildName;
+ }
+
+ public static ObjectNode readSchema(URL schemaUrl) {
+
+ ObjectNode schemaNode = NODE_FACTORY.objectNode();
+ schemaNode.put("$ref", schemaUrl.toString());
+ return schemaNode;
+
+ }
+
+ public static ObjectNode mergeProperties(ObjectNode content, ObjectNode parent) {
+
+ ObjectNode merged = parent.deepCopy();
+ Iterator<Map.Entry<String, JsonNode>> fields = content.fields();
+ for( ; fields.hasNext(); ) {
+ Map.Entry<String, JsonNode> field = fields.next();
+ String fieldId = field.getKey();
+ merged.put(fieldId, field.getValue().deepCopy());
+ }
+ return merged;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java
----------------------------------------------------------------------
diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java
new file mode 100644
index 0000000..04e8904
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java
@@ -0,0 +1,31 @@
+package org.apache.streams.schema;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Created by sblackmon on 5/1/16.
+ */
+public class URIUtil {
+
+ public static URI removeFragment(URI id) {
+ return URI.create(StringUtils.substringBefore(id.toString(), "#"));
+ }
+
+ public static URI removeFile(URI id) {
+ return URI.create(StringUtils.substringBeforeLast(id.toString(), "/"));
+ }
+
+ public static Optional<URI> safeResolve(URI absolute, String relativePart) {
+ if( !absolute.isAbsolute()) return Optional.absent();
+ try {
+ return Optional.of(absolute.resolve(relativePart));
+ } catch( IllegalArgumentException e ) {
+ return Optional.absent();
+ }
+ }
+
+}