You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2019/07/22 15:16:44 UTC

[streams] branch master updated: STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact

This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git


The following commit(s) were added to refs/heads/master by this push:
     new 8328cb3  STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact
     new 4c024f8  Merge pull request #485 from steveblackmon/STREAMS-647
8328cb3 is described below

commit 8328cb3ba5f24ea0696e782cc76b111d65da9907
Author: Steve Blackmon <sb...@apache.org>
AuthorDate: Tue Jul 16 12:00:44 2019 -0500

    STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact
    
    https://issues.apache.org/jira/browse/STREAMS-647
---
 .../src/main/resources/fullcontact.ttl             | 107 ++++
 .../fullcontact/FullContactSocialGraph.scala       | 164 +++++++
 .../fullcontact/PersonEnrichmentProcessor.scala    | 175 +++++++
 .../fullcontact/util/AffinityOrdering.scala        |  38 ++
 .../fullcontact/util/FullContactUtils.scala        | 537 +++++++++++++++++++++
 5 files changed, 1021 insertions(+)

diff --git a/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl b/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl
new file mode 100644
index 0000000..7275e00
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+PREFIX : <http://streams.apache.org/fullcontact#>
+PREFIX as: <http://www.w3.org/ns/activitystreams#>
+PREFIX dc: <http://purl.org/dc/elements/1.1/#>
+PREFIX dct: <http://purl.org/dc/terms/#>
+PREFIX owl: <http://www.w3.org/2002/07/owl#>
+PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+PREFIX vcard: <http://www.w3.org/2006/vcard/ns#>
+PREFIX xs: <http://www.w3.org/2001/XMLSchema#>
+BASE <http://streams.apache.org/fullcontact#>
+
+:PersonSummary a owl:Thing
+  rdfs:comment "PersonSummary"@en ;
+  rdfs:label "PersonSummary"@en .
+
+:ageRange a owl:DatatypeProperty ;
+  rdfs:label "ageRange"@en ;
+  rdfs:comment "Age range of the contact."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:avatar a owl:DatatypeProperty ;
+  rdfs:label "avatar"@en ;
+  rdfs:comment "URL of the contact's photo."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:bio a owl:DatatypeProperty ;
+  rdfs:label "bio"@en ;
+  rdfs:comment "Biography of the contact."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:email a owl:DatatypeProperty ;
+  rdfs:label "email"@en ;
+  rdfs:comment "The email address of the contact. (Queryable)"@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:organization a owl:DatatypeProperty ;
+  rdfs:label "organization"@en ;
+  rdfs:comment "Current or most recent place of work."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:title a owl:DatatypeProperty ;
+  rdfs:label "title"@en ;
+  rdfs:comment "Current or most recent job title."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:phone a owl:DatatypeProperty ;
+  rdfs:label "phone"@en ;
+  rdfs:comment "Phone number of the contact. (Queryable)"@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:updated a owl:DatatypeProperty ;
+  rdfs:label "updated"@en ;
+  rdfs:comment "Date-time last updated."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:website a owl:DatatypeProperty ;
+  rdfs:label "website"@en ;
+  rdfs:comment "URL of the contact's website."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:PersonDetails a owl:Thing
+  rdfs:comment "PersonDetails"@en ;
+  rdfs:label "PersonDetails"@en .
+
+:details a owl:ObjectProperty ;
+  rdfs:label "details"@en ;
+  rdfs:comment "When included, additional details about the contact provided through Data Add-ons will be available here."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range :PersonDetails .
+
+:website a owl:DatatypeProperty ;
+  rdfs:label "website"@en ;
+  rdfs:comment "URL of the contact's website."@en ;
+  rdfs:domain :PersonSummary ;
+  rdfs:range xsd:string .
+
+:gender a owl:DatatypeProperty ;
+  rdfs:label "gender"@en ;
+  rdfs:comment "Gender of the contact."@en ;
+  rdfs:domain :PersonDetails ;
+  rdfs:range xsd:string .
+
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala
new file mode 100644
index 0000000..929ff09
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact
+
+import java.io.InputStream
+import java.io.OutputStream
+import java.io.{BufferedInputStream, File, FileInputStream, FileOutputStream, PrintStream}
+import java.util.Scanner
+import java.util.concurrent.Callable
+
+import com.google.common.base.Preconditions
+import com.typesafe.config.Config
+import org.apache.juneau.json.JsonParser
+import org.apache.streams.config.StreamsConfigurator
+import org.apache.streams.fullcontact.FullContactSocialGraph.FullContactSocialGraphStats
+import org.apache.streams.fullcontact.FullContactSocialGraph.typesafe
+import org.apache.streams.fullcontact.pojo.PersonSummary
+import org.apache.streams.fullcontact.util.FullContactUtils
+
+import scala.io.Source
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+/**
+  * Produce an activity streams 2.0 social graph from full contact response data file.
+  */
+object FullContactSocialGraph {
+
+  lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.FullContactSocialGraph")
+
+  /**
+    * To use from command line:
+    *
+    * <p/>
+    * java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=application.conf org.apache.streams.fullcontact.SocialGraphCli
+    *
+    * <p/>
+    * Input stream should contain a series of json-serialized `PersonSummary` objects.
+    *
+    * <p/>
+    * Output stream will contain a TTL-serialized social graph.
+    *
+    * <p/>
+    * Input to the process is:
+    *   A file if application.conf contains an 'input' key
+    *   A file if -Dinput= is specified
+    *   stdin otherwise
+    *
+    * Output from the process is:
+    *   A file if application.conf contains an 'input' key
+    *   A file if -Doutput= is specified
+    *   stdout otherwise
+    *
+    * @link org.apache.streams.fullcontact.FullContactSocialGraph
+    * @throws Exception Exception
+    */
+  @throws[Exception]
+  final def main(args: Array[String]): Unit = {
+
+    val inputStream: InputStream = if (typesafe.hasPath("input")) {
+      new FileInputStream(new File(typesafe.getString("input")))
+    } else System.in
+
+    val outputStream: OutputStream = if (typesafe.hasPath("output")) {
+      new FileOutputStream(new File(typesafe.getString("output")))
+    } else System.out
+
+    val job = Try(new FullContactSocialGraph(inputStream, outputStream))
+
+    job match {
+      case Success(_) => return job.get
+      case Failure(t : Throwable) => throw new Exception(t)
+    }
+  }
+
+  case class FullContactSocialGraphStats(
+                                          inputLines : Int,
+                                          personSummaries : Int,
+                                          allOrganizations : Int,
+                                          allInterestItems : Int,
+                                          uniqueInterests : Int,
+                                          topicHierarchy : Int,
+                                          allProfiles : Int,
+                                          allProfileRelationships : Int,
+                                          allEmploymentItems : Int,
+                                          uniqueEmployers : Int,
+                                          allUrlRelationships : Int,
+                                          allImageRelationships : Int
+  )
+}
+
+class FullContactSocialGraph(in: InputStream, out: OutputStream ) extends Callable[FullContactSocialGraphStats] {
+
+  val inputStream: BufferedInputStream = new BufferedInputStream(in)
+  val outputStream: PrintStream = new PrintStream(out)
+
+  override def call() : FullContactSocialGraphStats = {
+
+    val input = Source.fromInputStream (inputStream)
+    val inputLines = input.getLines()
+
+    // sequence of all PersonSummary
+    val personSummaries = inputLines.map (JsonParser.DEFAULT.parse (_, classOf[PersonSummary] ) ).toSeq
+
+    // PersonSummary derived sequences
+    val allOrganizations = FullContactUtils.allOrganizationItems (personSummaries.toIterator).toSeq
+    val allInterestItems = FullContactUtils.allInterestItems (personSummaries.toIterator).toSeq
+    val uniqueInterests = FullContactUtils.uniqueInterests (allInterestItems.toIterator).toSeq
+    val topicHierarchy = FullContactUtils.topicHierarchy (allInterestItems.toIterator).toSeq
+
+    val allProfiles = FullContactUtils.allProfiles (personSummaries.toIterator).seq
+    val allProfileRelationships = FullContactUtils.allProfileRelationships (personSummaries.toIterator).seq
+
+    val allEmploymentItems = FullContactUtils.allEmploymentItems (personSummaries.toIterator).seq
+    val uniqueEmployers = FullContactUtils.uniqueEmployers (allEmploymentItems).seq
+
+    val allUrlRelationships = FullContactUtils.allUrlRelationships (personSummaries.toIterator).seq
+    val allImageRelationships = FullContactUtils.allImageRelationships (personSummaries.toIterator).seq
+
+    personSummaries.flatMap (FullContactUtils.safe_personSummaryAsTurtle).foreach (outputStream.println (_) )
+    allOrganizations.flatMap (FullContactUtils.safe_organizationAsTurtle).foreach (outputStream.println (_) )
+
+    uniqueInterests.flatMap (FullContactUtils.safe_interestTopicAsTurtle).foreach (outputStream.println (_) )
+    topicHierarchy.map (FullContactUtils.topicRelationshipAsTurtle).foreach (outputStream.println (_) )
+
+    allUrlRelationships.flatMap (FullContactUtils.safe_urlRelationshipAsTurtle).foreach (outputStream.println (_) )
+    allImageRelationships.flatMap (FullContactUtils.safe_imageRelationshipAsTurtle).foreach (outputStream.println (_) )
+
+    allProfiles.flatMap (FullContactUtils.safe_profileAsTurtle).foreach (outputStream.println (_) )
+    allProfileRelationships.flatMap (FullContactUtils.safe_personProfileRelationshipAsTurtle).foreach (outputStream.println (_) )
+
+    FullContactSocialGraphStats(
+      inputLines = inputLines.size,
+      personSummaries = personSummaries.size,
+      allOrganizations = allOrganizations.size,
+      allInterestItems = allInterestItems.size,
+      uniqueInterests = uniqueInterests.size,
+      topicHierarchy = topicHierarchy.size,
+      allProfiles = allProfiles.size,
+      allProfileRelationships = allProfileRelationships.size,
+      allEmploymentItems = allEmploymentItems.size,
+      uniqueEmployers = uniqueEmployers.size,
+      allUrlRelationships = allUrlRelationships.size,
+      allImageRelationships = allImageRelationships.size
+    )
+  }
+}
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala
new file mode 100644
index 0000000..15d06c9
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact
+
+import java.io.BufferedInputStream
+import java.io.BufferedOutputStream
+import java.io.File
+import java.io.FileInputStream
+import java.io.FileOutputStream
+import java.io.PrintStream
+import java.util.Scanner
+
+import com.typesafe.config.Config
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.core.StreamsProcessor
+import org.apache.streams.fullcontact.api.EnrichPersonRequest
+import org.apache.streams.fullcontact.config.FullContactConfiguration
+import org.apache.streams.fullcontact.pojo.PersonSummary
+
+import scala.collection.JavaConversions._
+import scala.io.Source
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+/**
+  * Call enrich persons on a series of requests
+  */
+object PersonEnrichmentProcessor {
+
+  lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.PersonEnrichmentProcessor")
+
+  /**
+    * To use from command line:
+    *
+    * <p/>
+    * Supply (at least) the following required configuration in application.conf:
+    *
+    * <p/>
+    * org.apache.streams.fullcontact.config.FullContactConfiguration.token = ""
+    *
+    * <p/>
+    * Launch syntax:
+    *
+    * <p/>
+    * java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=./application.conf org.apache.streams.fullcontact.provider.PersonEnrichmentProcessor
+    *
+    * <p/>
+    * Input to the process is:
+    *   A file if application.conf contains an 'input' key
+    *   A file if -Dinput= is specified
+    *   stdin otherwise
+    *
+    * Output from the process is:
+    *   A file if application.conf contains an 'input' key
+    *   A file if -Doutput= is specified
+    *   stdout otherwise
+    *
+    * @link org.apache.streams.fullcontact.api.EnrichPersonRequest
+    * @param args application.conf input.jsonl output.jsonl
+    * @throws Exception Exception
+    */
+  @throws[Exception]
+  final def main(args: Array[String]): Unit = {
+
+    val inputStream = if (typesafe.hasPath("input")) {
+      new BufferedInputStream(new FileInputStream(new File(typesafe.getString("input"))))
+    } else System.in
+
+    val outputStream = if (typesafe.hasPath("output")) {
+      new PrintStream(new FileOutputStream(new File(typesafe.getString("output"))))
+    } else System.out
+
+    val input = Source.fromInputStream(inputStream)
+    val inputLines = input.getLines()
+    val inputDatums = inputLines.map(entry => new StreamsDatum(entry))
+
+    val outStream = new PrintStream(new BufferedOutputStream(outputStream))
+
+    val outputDatums = streamDatums(inputDatums)
+    val outputLines = outputDatums.map(_.getDocument().asInstanceOf[String])
+
+    for( line <- outputLines ) {
+      outStream.println(line)
+    }
+    outStream.flush()
+    outStream.close()
+  }
+
+  def stream( iter : Iterator[EnrichPersonRequest] )
+             ( implicit processor : PersonEnrichment = FullContact.getInstance() ) : Iterator[PersonSummary] = {
+    iter.map( item => processor.enrichPerson(item) )
+  }
+
+  def streamDatums( iter : Iterator[StreamsDatum] )
+            ( implicit processor : PersonEnrichmentProcessor = new PersonEnrichmentProcessor() ) : Iterator[StreamsDatum] = {
+    iter.flatMap( item => processor.process(item) )
+  }
+
+  def processor : Iterator[EnrichPersonRequest] => Iterator[PersonSummary] = {
+    stream(_)
+  }
+
+}
+
+class PersonEnrichmentProcessor(config : FullContactConfiguration = new ComponentConfigurator[FullContactConfiguration](classOf[FullContactConfiguration]).detectConfiguration())
+  extends StreamsProcessor with Serializable {
+
+  var personEnrichment = FullContact.getInstance(config)
+
+  /**
+    * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will
+    * passed to every down stream operation that reads from this processor.
+    *
+    * @param entry StreamsDatum to be processed
+    * @return resulting StreamDatums from processing. Should never be null or contain null object.  Empty list OK.
+    */
+  override def process(entry: StreamsDatum): java.util.List[StreamsDatum] = {
+    val request : EnrichPersonRequest = {
+      entry.getDocument match {
+        case _ : EnrichPersonRequest => entry.getDocument.asInstanceOf[EnrichPersonRequest]
+        case _ : String => personEnrichment.parser.parse(entry.getDocument, classOf[EnrichPersonRequest])
+        case _ => throw new Exception("invalid input type")
+      }
+    }
+    val attempt = Try(personEnrichment.enrichPerson(request))
+    attempt match {
+      case Success(_ : PersonSummary) => List(new StreamsDatum(personEnrichment.serializer.serialize(attempt.get)))
+      case Failure(_) => List()
+    }
+  }
+
+  /**
+    * Each operation must publish an identifier.
+    */
+  override def getId: String = "PersonEnrichmentProcessor"
+
+  /**
+    * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
+    *
+    * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
+    *                            will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
+    */
+  override def prepare(configurationObject: Any): Unit = {
+    personEnrichment = FullContact.getInstance(configurationObject.asInstanceOf[FullContactConfiguration])
+  }
+
+  /**
+    * No guarantee that this method will ever be called.  But upon shutdown of the stream, an attempt to call this method
+    * will be made.
+    * Use this method to terminate connections, etc.
+    */
+  override def cleanUp(): Unit = {
+    personEnrichment.restClient.close()
+    personEnrichment = null
+  }
+}
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala
new file mode 100644
index 0000000..dabadc1
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact.util
+
+import org.apache.juneau.ObjectMap
+
+object AffinityOrdering extends Ordering[ObjectMap] {
+  def sortByAffinityString(s1: String, s2: String) : Int = {
+    if(s1 == null && s2 == null) return 0;
+    if(s1 == null ) return -1;
+    if(s2 == null ) return 1;
+    if(s1.isEmpty && s2.isEmpty) return 0;
+    if(s1 == s2) return 0;
+    if(s1.isEmpty ) return -1;
+    if(s2.isEmpty ) return 1;
+    return s1.takeRight(1).compareToIgnoreCase(s2.takeRight(1))
+  }
+  def sortByAffinity(o1: ObjectMap, o2: ObjectMap) : Int = {
+    return sortByAffinityString(o1.getString("affinity", ""), o2.getString("affinity", ""))
+  }
+  def compare(a:ObjectMap, b:ObjectMap) = sortByAffinity(a,b)
+}
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala
new file mode 100644
index 0000000..e6db251
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact.util
+
+import org.apache.juneau.ObjectList
+import org.apache.juneau.ObjectMap
+import org.apache.juneau.json.JsonParser
+import org.apache.streams.config.ComponentConfigurator
+
+import scala.collection.mutable.ListBuffer
+
+case class Employer(name : String, domain : String)
+
+case class EmployerRelationship(personId : String, employerId : String)
+
+case class InterestTopic(id : String, name : String, category : String)
+
+case class TopicRelationship(child : String, parent : String)
+
+case class Organization(name : String, domain : String)
+
+case class OrganizationRelationship(personId : String, orgId : String)
+
+case class PersonProfileRelationship(personId : String, profileUri : String)
+
+case class ImageRelationship( entityUri : String, url : String, label : String)
+
+case class UrlRelationship( entityUri : String, url : String, label : String)
+
+object FullContactUtils {
+
+  import org.apache.streams.fullcontact.api._
+  import org.apache.streams.fullcontact.config._
+  import org.apache.streams.fullcontact.pojo._
+
+  import scala.collection.JavaConversions._
+  import scala.util.Try
+
+  final implicit val fullContactConfiguration = new ComponentConfigurator(classOf[FullContactConfiguration]).detectConfiguration()
+
+  val apst_angellist_ns = "http://streams.apache.org/streams-contrib/streams-provider-angellist#"
+  val apst_facebook_ns = "http://streams.apache.org/streams-contrib/streams-provider-facebook#"
+  val apst_foursquare_ns = "http://streams.apache.org/streams-contrib/streams-provider-foursquare#"
+  val apst_googleplus_ns = "http://streams.apache.org/streams-contrib/streams-provider-googleplus#"
+  val apst_instagram_ns = "http://streams.apache.org/streams-contrib/streams-provider-instagram#"
+  val apst_linkedin_ns = "http://streams.apache.org/streams-contrib/streams-provider-linkedin#"
+  val apst_twitter_ns = "http://streams.apache.org/streams-contrib/streams-provider-twitter#"
+
+  val fc_ns = "http://api.fullcontact.com/"
+  val fc_image_ns = "http://api.fullcontact.com/image/"
+  val fc_org_ns = "http://api.fullcontact.com/organization/"
+  val fc_person_ns = "http://api.fullcontact.com/person/"
+  val fc_profile_ns = "http://api.fullcontact.com/profile/"
+  val fc_topic_ns = "http://api.fullcontact.com/topic/"
+  val fc_url_ns = "http://api.fullcontact.com/url/"
+
+  val fc_prefix = "fc"
+  val fc_image_prefix = "fc_img"
+  val fc_org_prefix = "fc_org"
+  val fc_person_prefix = "fc_person"
+  val fc_profile_prefix = "fc_profile"
+  val fc_topic_prefix = "fc_topic"
+  val fc_url_prefix = "fc_url"
+
+  def imgId( url: String ) : String = url.hashCode.toString
+
+  def urlId( url: String ) : String = url.hashCode.toString
+
+  def companyId( company: CompanySummary ) : String = {
+    company.getName.replaceAll("\\p{Punct}","-")
+  }
+
+  def orgId( organization: Organization ) : String = {
+    organization.domain.replaceAll("\\p{Punct}", "-")
+  }
+
+  def orgLabel( input: String ) : String = {
+    input.replaceAll("\\p{Punct}", " ")
+  }
+
+  def orgLabel( organization: Organization ) : String = {
+    orgLabel(organization.name)
+  }
+
+  def personId( input : PersonSummary ) : String = {
+    input.getFullName.replaceAll("\\W","")
+  }
+
+  def personSafeName( input : PersonSummary ) : String = {
+    input.getFullName.replaceAll("\\W","")
+  }
+
+  def profileUsername( input : PersonProfile ) : String = {
+    input.getUsername.replaceAll("\\W","")
+  }
+
+  def profileNamespaceAndId( profile : PersonProfile ) : (String,String) = {
+    profile.getService match {
+      case "angellist" => (apst_angellist_ns,profileId(profile))
+      case "twitter" => (apst_twitter_ns,profileId(profile))
+      case "facebook" | "facebookpage" => (apst_facebook_ns,profileId(profile))
+      case "foursquare" => (apst_foursquare_ns,profileId(profile))
+      case "linkedin" => (apst_linkedin_ns,profileId(profile))
+      case "instagram" => (apst_instagram_ns,profileUsername(profile))
+      case "googleplus" => (apst_googleplus_ns,profileId(profile))
+    }
+  }
+
+  def uriFromNamespaceAndId( ns_id : (String, String) ) = s"${ns_id._1}${ns_id._2}"
+  
+  def profilePrefixAndType( profile : PersonProfile ) : (String,String) = {
+    profile.getService match {
+      case "angellist" => ("fc","Profile")
+      case "twitter" => ("apst","TwitterProfile")
+      case "facebook" | "facebookpage" => ("apst","FacebookProfile")
+      case "foursquare" => ("apst","FoursquareProfile")
+      case "linkedin" => ("apst","LinkedinProfile")
+      case "instagram" => ("apst","InstagramProfile")
+      case "googleplus" => ("apst","GooglePlusProfile")
+    }
+  }
+
+  def profileId( profile : PersonProfile ) : String = {
+    val idOrUsername = {
+      if( profile.getUserid.nonEmpty ) profile.getUserid
+      else if ( profile.getUsername.nonEmpty ) profile.getUsername
+      else profile.getUrl.split("/").last
+    }
+    s"${profile.getService}/${idOrUsername}"
+  }
+
+  def topicId( topic : InterestTopic ) : String = {
+    topic.id
+  }
+
+  def selectProfiles( profiles : PersonProfiles) : Seq[PersonProfile] = {
+    val list = new ListBuffer[PersonProfile]
+    list += profiles.getAngellist
+    list += profiles.getFacebook
+    list += profiles.getFacebookpage
+    list += profiles.getFoursquare
+    list += profiles.getGoogleplus
+    list += profiles.getInstagram
+    list += profiles.getLinkedin
+    list += profiles.getTwitter
+    list
+  }
+
+  def personProfileRelationships( summary : PersonSummary ) : Seq[PersonProfileRelationship] = {
+    val list = new ListBuffer[PersonProfileRelationship]
+    for( profile <- selectProfiles(summary.getDetails.getProfiles())) {
+      val person_id = Try(personId(summary))
+      val profile_uri = Try(uriFromNamespaceAndId(profileNamespaceAndId(profile)))
+      if( person_id.isSuccess &&
+          person_id.get != null &&
+          !person_id.get.isEmpty &&
+          profile_uri.isSuccess &&
+          profile_uri.get != null &&
+            !profile_uri.get.isEmpty ) {
+        val rel = PersonProfileRelationship(person_id.get, profile_uri.get)
+        list += rel
+      }
+    }
+    list
+  }
+
+  def allInterestItems( input : Iterator[PersonSummary] ) : Iterator[PersonInterestItem] = {
+    input.flatMap(person => person.getDetails.getInterests.toSeq)
+  }
+
+  def uniqueInterests( input : Iterator[PersonInterestItem] ) : Iterator[InterestTopic] = {
+    input.map(interestItem => InterestTopic(interestItem.getId, interestItem.getName, interestItem.getCategory)).toSet.toIterator
+  }
+
+  def topicHierarchy( input : Iterator[PersonInterestItem] ) : Iterator[TopicRelationship] = {
+    input.flatMap(item => item.getParentIds.map(parentId => TopicRelationship(item.getId, parentId))).toSet.toIterator
+  }
+
+  def allImageRelationships( input : Iterator[PersonSummary] ) : Iterator[ImageRelationship] = {
+    input.flatMap(item => item.getDetails.getPhotos.flatMap(
+      photo => Try(ImageRelationship(uriFromNamespaceAndId(fc_person_ns,personId(item)), photo.getValue, photo.getLabel)).toOption
+    ))
+  }
+
+  def allUrlRelationships( input : Iterator[PersonSummary] ) : Iterator[UrlRelationship] = {
+    input.flatMap(item => item.getDetails.getUrls.flatMap(
+      url => Try(UrlRelationship(uriFromNamespaceAndId(fc_person_ns,personId(item)), url.getValue, url.getLabel)).toOption
+    ))
+  }
+
+  def allOrganizationItems( input : Iterator[PersonSummary] ) : Iterator[Organization] = {
+    input.flatMap(person => person.getDetails.getEmployment.map(employment => Organization(employment.getName, employment.getDomain))).toSet.toIterator
+  }
+
+  def allProfiles( input : Iterator[PersonSummary] ) : Iterator[PersonProfile] = {
+    input.flatMap(summary => selectProfiles(summary.getDetails.getProfiles))
+  }
+
+  def allProfileRelationships( input : Iterator[PersonSummary] ) : Iterator[PersonProfileRelationship] = {
+    input.flatMap(personProfileRelationships)
+  }
+
+  def allEmploymentItems( input : Iterator[PersonSummary] ) : Iterator[PersonEmploymentItem] = {
+    input.flatMap(person => person.getDetails.getEmployment.toSeq)
+  }
+
+  def uniqueEmployers( input : Iterator[PersonEmploymentItem] ) : Iterator[Employer] = {
+    input.map(employmentItem => Employer(employmentItem.getName, employmentItem.getDomain)).toSet.toIterator
+  }
+
+  def personSummaryAsTurtle(root: PersonSummary): String = {
+    val id = personId(root)
+    val sb = new StringBuilder()
+    sb.append(s"""|${fc_person_prefix}:${id}
+                  |      a ${fc_prefix}:Person ;
+                  |      as:displayName "${root.getFullName}" ;
+                  |      as:url "${root.getWebsite}" ;
+                  |      dct:modified "${root.getUpdated}" ;
+                  |      vcard:fn "${root.getFullName}" ;
+                  |      vcard:org "${orgLabel(root.getOrganization)}" ;
+                  |      vcard:title "${root.getTitle}" ;
+                  |      .
+                  |
+                  |""".stripMargin)
+    if( !root.getGender.isEmpty)
+      sb.append(s"""${fc_person_prefix}:${id} vcard:gender "${root.getGender}" . """).append("\n")
+    if( root.getDetails.getEmails != null && !root.getDetails.getEmails.isEmpty)
+      root.getDetails.getEmails.foreach (
+        email => {
+          sb.append(s"""${fc_person_prefix}:${id} vcard:email "mailto:${email.getValue}" . """).append("\n")
+        }
+      )
+    if( root.getDetails.getPhones != null && !root.getDetails.getPhones.isEmpty)
+      root.getDetails.getPhones.foreach (
+        tel => {
+          sb.append(s"""${fc_person_prefix}:${id} vcard:tel "tel:${tel.getValue}" . """).append("\n")
+        }
+      )
+    if( root.getDetails.getUrls != null && !root.getDetails.getUrls.isEmpty)
+      root.getDetails.getUrls.foreach (
+        url => {
+          sb.append(s"""${fc_person_prefix}:${id} vcard:url "${url.getValue}" . """).append("\n")
+        }
+      )
+    sb.toString
+  }
+
+  def safe_personSummaryAsTurtle(root: PersonSummary): Option[String] = {
+    Try(personSummaryAsTurtle(root)).toOption
+  }
+
+  def urlRelationshipAsTurtle(item: UrlRelationship) : String = {
+    val uri = fc_url_ns + urlId(item.url)
+    val sb = new StringBuilder()
+    sb.append(s"""|<${uri}>
+                  |   a as:Link ;
+                  |   as:href "${item.url}" ;
+                  |   as:rel "${item.label}" .
+                  |
+                  |<${item.entityUri}> as:link <${uri}> .
+                  |
+                  |""".stripMargin)
+    sb.toString
+  }
+
+  def safe_urlRelationshipAsTurtle(item: UrlRelationship) : Option[String] = {
+    Try(urlRelationshipAsTurtle(item)).toOption
+  }
+
+  def imageRelationshipAsTurtle(item: ImageRelationship) : String = {
+    val uri = fc_url_ns + urlId(item.url)
+    val sb = new StringBuilder()
+    sb.append(s"""|<${uri}>
+                  |   a as:Image ;
+                  |   as:href "${item.url}" ;
+                  |   as:rel "${item.label}" .
+                  |
+                  |<${item.entityUri}> as:image <${uri}> .
+                  |
+                  |""".stripMargin)
+    sb.toString
+  }
+
+  def safe_imageRelationshipAsTurtle(item: ImageRelationship) : Option[String] = {
+    Try(imageRelationshipAsTurtle(item)).toOption
+  }
+
+  def interestTopicAsTurtle(topic: InterestTopic): String = {
+    val id = topic.id
+    val name = topic.name.replaceAll("\\p{Punct}"," ");
+    val sb = new StringBuilder()
+    sb.append(s"""|${fc_topic_prefix}:${id} a skos:Concept .
+                  |${fc_topic_prefix}:${id} skos:prefLabel "${name}" .
+                  |
+                  |""".stripMargin)
+    sb.toString
+  }
+
+  def safe_interestTopicAsTurtle(topic: InterestTopic): Option[String] = {
+    Try(interestTopicAsTurtle(topic)).toOption
+  }
+
+  def topicRelationshipAsTurtle(relationship: TopicRelationship): String = {
+    s"${fc_topic_prefix}:${relationship.child} skos:broader ${fc_topic_prefix}:${relationship.parent} ."
+  }
+
+  def personInterestsAsTurtle(root: PersonSummary): String = {
+    val id = personId(root)
+    val sb = new StringBuilder()
+    if( root.getDetails.getInterests != null && !root.getDetails.getInterests.isEmpty)
+      root.getDetails.getInterests.foreach (
+        interest => {
+          sb.append(s"""${fc_person_prefix}:${id} foaf:interest ${fc_topic_prefix}:${interest.getId} . """).append("\n")
+        }
+      )
+    sb.toString
+  }
+
+  def safe_personInterestsAsTurtle(root: PersonSummary): Option[String] = {
+    Try(personInterestsAsTurtle(root)).toOption
+  }
+
+  def organizationAsTurtle(organization: Organization): String = {
+    val ns = fc_org_ns
+    val id = orgId(organization)
+    val sb = new StringBuilder()
+    sb.append(s"""|${fc_org_prefix}:${id}
+                  |      a ${fc_prefix}:Organization ;
+                  |      as:displayName "${orgLabel(organization)}" ;
+                  |      as:url "${organization.domain}" ;
+                  |      .
+                  |
+                  |""".stripMargin)
+    sb.toString
+  }
+
+  def safe_organizationAsTurtle(organization: Organization): Option[String] = {
+    Try(organizationAsTurtle(organization)).toOption
+  }
+
+  def personProfileRelationshipAsTurtle(relationship: PersonProfileRelationship): String = {
+    s"<${relationship.profileUri}> as:describes ${fc_person_prefix}:${relationship.personId} ."
+  }
+
+  def safe_personProfileRelationshipAsTurtle(relationship: PersonProfileRelationship): Option[String] = {
+    import scala.util.Try
+    Try(personProfileRelationshipAsTurtle(relationship)).toOption
+  }
+
+  def profileAsTurtle(profile: PersonProfile): String = {
+    val id = profileId(profile)
+    val uri = uriFromNamespaceAndId(profileNamespaceAndId(profile))
+    val prefix_type = profilePrefixAndType(profile)
+    val bio = profile.getBio.replaceAll("\\p{Punct}"," ");
+    val sb = new StringBuilder()
+    sb.append(s"""|<$uri>
+                  |      a ${prefix_type._1}:${prefix_type._2} ;
+                  |      as:id "${id}" ;
+                  |      as:name "${profile.getUsername}" ;
+                  |      as:displayName "${profile.getUsername}" ;
+                  |      as:summary "${bio}" ;
+                  |      as:url "${profile.getUrl}" ;
+                  |      as:provider "${profile.getService}" ;
+                  |""".stripMargin)
+    if( profile.getFollowers != null)
+      sb.append(s"""      apst:followers "${profile.getFollowers}"^^xs:integer ;""").append("\n")
+    if( profile.getFollowing != null)
+      sb.append(s"""      apst:following "${profile.getFollowing}"^^xs:integer ;""").append("\n")
+    sb.append("").append("\n")
+    sb.toString
+  }
+
+  def safe_profileAsTurtle(profile: PersonProfile): Option[String] = {
+    Try(profileAsTurtle(profile)).toOption
+  }
+
+
+  def companySummaryAsTurtle(root: CompanySummary): String = {
+    import scala.collection.JavaConversions._
+    val ns = fc_org_ns
+    val id = companyId(root)
+    val bio = root.getBio.replaceAll("\\p{Punct}"," ");
+    val sb = new StringBuilder()
+    sb.append(s"""|${fc_org_prefix}:${id}
+                  |      a ${fc_prefix}:Organization ;
+                  |      as:displayName "${root.getName}" ;
+                  |      as:url "${root.getWebsite}" ;
+                  |      as:summary "${bio}" ;
+                  |      ${fc_prefix}:employees "${root.getEmployees}"^^xs:integer ;
+                  |      ${fc_prefix}:founded "${root.getFounded}"^^xs:integer ;
+                  |      vcard:category "${root.getCategory}" ;
+                  |      .
+                  |""".stripMargin)
+    if( root.getDetails.getEmails != null && !root.getDetails.getEmails.isEmpty)
+      root.getDetails.getEmails.foreach (
+        email => {
+          sb.append(s"""${fc_org_prefix}:${id} vcard:email "mailto:${email.getValue}" . """).append("\n")
+        }
+      )
+    if( root.getDetails.getPhones != null && !root.getDetails.getPhones.isEmpty)
+      root.getDetails.getPhones.foreach (
+        phone => {
+          sb.append(s"""${fc_org_prefix}:${id} vcard:tel "tel:${phone.getValue}" . """).append("\n")
+        }
+      )
+    if( root.getDetails.getUrls != null && !root.getDetails.getUrls.isEmpty)
+      root.getDetails.getUrls.foreach (
+        url => {
+          sb.append(s"""${fc_org_prefix}:${id} vcard:url "${url.getUrl}" . """).append("\n")
+        }
+      )
+    sb.toString
+  }
+
+  def safe_companySummaryAsTurtle(profile: CompanySummary): Option[String] = {
+    Try(companySummaryAsTurtle(profile)).toOption
+  }
+
+  def normalize_age_range(ageRange: String)  : Option[Int] = {
+    val avg = Try {
+      val seq = ageRange.split("-").map(_.toLong).toSeq
+      (seq.sum / seq.length).toInt
+    }.toOption
+    avg
+  }
+
+  def callEnrichPerson(request : EnrichPersonRequest)(implicit config : FullContactConfiguration) : String = {
+    import java.net.URI
+
+    import org.apache.http.client.utils.URIBuilder
+    import org.apache.juneau.json.JsonParser
+    import org.apache.juneau.json.JsonSerializer
+    import org.apache.juneau.rest.client.RestCall
+    import org.apache.juneau.rest.client.RestClient
+    val auth_header = s"Bearer ${config.getToken()}"
+    val url = "https://api.fullcontact.com/v3/person.enrich"
+    val uri : URI = new URIBuilder(url).build()
+    lazy val parser = JsonParser.create().
+      debug().
+      ignoreUnknownBeanProperties(true).
+      ignorePropertiesWithoutSetters(true).
+      build()
+    lazy val serializer = JsonSerializer.create().
+      debug().
+      trimEmptyCollections(true).
+      trimNullProperties(true).
+      trimEmptyMaps(true).
+      build()
+    val restClientBuilder = RestClient.
+      create().
+      authorization(auth_header).
+      beansRequireSerializable(true).
+      debug().
+      disableAutomaticRetries().
+      disableCookieManagement().
+      disableRedirectHandling().
+      json().
+      parser(parser).
+      rootUrl(uri).
+      serializer(serializer)
+
+    val post : RestCall = restClientBuilder.
+      build().
+      doPost(uri).
+      body(request)
+
+    Thread.sleep(1000)
+
+    post.getResponseAsString
+  }
+
+  def safeCallEnrichPerson(request : EnrichPersonRequest) : Option[String] = {
+    Try(callEnrichPerson(request)).toOption
+  }
+
+  def parseEnrichPersonResponse(response : String): PersonSummary = {
+    import org.apache.juneau.json.JsonParser
+    val result = JsonParser.DEFAULT.parse(response, classOf[PersonSummary])
+    result
+  }
+
+  def safeParseEnrichPersonResponse(response : String): Option[PersonSummary] = {
+    Try(parseEnrichPersonResponse(response)).toOption
+  }
+
+  def educationItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s ${row.getString("degree","")}%s (${row.getAt("start/year",classOf[String])}%s - ${row.getAt("end/year",classOf[String])}%s)""").toOption
+  def educationSummary(json : String) : List[String] = {
+    val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+    val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => educationItem(row) ))
+    if( summaries.isFailure || summaries.get.size == 0 ) return List() else return summaries.get.toList
+  }
+
+  def employmentItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s ${row.getString("title","")}%s (${row.getAt("start/year",classOf[String])}%s - ${row.getAt("end/year",classOf[String])})""").toOption
+  def employmentSummary(json : String) : List[String] = {
+    val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+    val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => employmentItem(row)))
+    if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+  }
+
+  def interestItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s (${row.getString("affinity","")}%s)""").toOption
+  def interestSummary(json : String) : List[String] = {
+    val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+    val summaries = Try(list.elements(classOf[ObjectMap]).toSeq.sorted(AffinityOrdering).flatMap( row => interestItem(row)))
+    if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+  }
+
+  def profileItem(row : ObjectMap) : Option[String] = Try(f"""${row.get("url")}%s""").toOption
+  def profileSummary(json : String) : List[String] = {
+    val map = JsonParser.DEFAULT.parse(json, classOf[ObjectMap])
+    val summaries = Try(map.entrySet().flatMap( row => profileItem(row.getValue().asInstanceOf[ObjectMap])).toSeq.sorted)
+    if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+  }
+
+  def urlItem(row : ObjectMap) : Option[String] = Try(f"""${row.get("value")}%s""").toOption
+  def urlSummary(json : String) : List[String] = {
+    val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+    val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => urlItem(row)).toSeq.sorted)
+    if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+  }
+
+}